Welcome!

Apache Authors: Jayaram Krishnaswamy, Yeshim Deniz, Bob Gourley, Si Chen, Kevin Benedict

Blog Feed Post

Let’s Get Esper Up & Running

For our CEP in the Cloud example, I’ve briefly outlined a stream based load balancing idea.  In this example, RuleBots (pieces of code that do something – think of the procedural extensions built into some vendor’s products) send utilization statistics to the CEP Load Balancer via RabbitMQ.  The CEP Load Balancer in this case is written in Java and uses Esper to create a stream containing all available processes (destinations) in a particular service pool.

What’s a Service Pool?

For this example, we define a service pool as available compute resources in the cloud available to perform a particular service.  In our example, where we’re implementing map/reduce and the classic word count tutorial, our service pool name is “WordCountMap.”

How Does It Work?

Our Twitter OnRamp, which listens to the Twitter stream, will publish the tweet via RabbitMQ.  The RabbitMQ queue that the OnRamp will use as the destination will be retrieved from the CEP Load Balancer.

Why Is This Cool?

Because the CEP Load Balancer doesn’t need to know all that much about any particular process – only that it’s available to do work.  Also, the Twitter OnRamp doesn’t have to embody some load balancing algorithm.  So if we decide to change how we do load balancing in our word count example, we make the change in one place.  And by avoiding partitioning at the RuleBot level, we maintain some flexibility – more processing power required?  Add processes and there’s no need to stop services, re-partition, and restart the services.  Easy peasy!

Esper Code

Here’s some example Esper code – load this up to get started.  We’ll modify it over time.  In the next installment, we’ll modify this code to receive utilization statistics from RuleBots. (if anyone knows a better way to make code look better in my posts, let me know – my html ‘code’ formatting doesn’t seem to be working – wordpress 2.9.2).

import com.espertech.esper.client.*;
import java.util.Random;
import java.util.Date;

import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class CEPLoadBalancer {

public static class Utilization {
String destination;
String servicePool;
Double load;
Date timeStamp;

public Utilization(String Des, String Ser, Double Ld, long t) {
destination = Des;
servicePool = Ser;
load = Ld;
timeStamp = new Date(t);
}

public String getDestination() {return destination;}
public String getServicePool() {return servicePool;}
public Double getLoad() {return load;}
public Date getTimeStamp() {return timeStamp;}

@Override
public String toString() {
return "Destination: " + destination + " ServicePool: " + servicePool + " Load " + load.toString()+ " Time " + timeStamp.toString();
}
}

private static Random generator = new Random();

public static void GenerateRandomUtilization(EPRuntime cepRT) {

// first map server
double load = (double) generator.nextInt(10);
long timeStamp = System.currentTimeMillis();
String destination = "MAP01";
String servicePool = "WordCountMap";
// create utililzation event
Utilization util = new Utilization( destination, servicePool, load, timeStamp);
cepRT.sendEvent(util);

// second map server
destination = "MAP02";
load = (double) generator.nextInt(10);
timeStamp = System.currentTimeMillis();
util = new Utilization( destination, servicePool, load, timeStamp);
cepRT.sendEvent(util);
}

public static class CEPListener implements UpdateListener {

public void update(EventBean[] newData, EventBean[] oldData) {
System.out.println("Event received: " + newData[0].getUnderlying());
}
}

public static void main(String[] args) {
SimpleLayout layout = new SimpleLayout();
ConsoleAppender appender = new ConsoleAppender(new SimpleLayout());
Logger.getRootLogger().addAppender(appender);
Logger.getRootLogger().setLevel((Level) Level.WARN);

Configuration cepConfig = new Configuration();
cepConfig.addEventType("Util", Utilization.class.getName());
EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig);
EPRuntime cepRT = cep.getEPRuntime();

EPAdministrator cepAdm = cep.getEPAdministrator();
EPStatement cepStatement = cepAdm.createEPL("select * from Util.win:length(5)");

cepStatement.addListener(new CEPListener());

// simulate loads
for (int i = 0; i < 10; i++) {
GenerateRandomUtilization(cepRT);
}
}
}

Read the original blog entry...

More Stories By Colin Clark

Colin Clark is the CTO for Cloud Event Processing, Inc. and is widely regarded as a thought leader and pioneer in both Complex Event Processing and its application within Capital Markets.

Follow Colin on Twitter at http:\\twitter.com\EventCloudPro to learn more about cloud based event processing using map/reduce, complex event processing, and event driven pattern matching agents. You can also send topic suggestions or questions to [email protected]