Hello, this is a little contribution to camel projet.
Sometime you need to regulate (slow down) the flow of a stream.
For example, imagine a marketdata flow where body of message is a
Map<String, Object>. Update rate is far too important for your need
(sometime more thant 20 mess/sec), so you want to regulate it at 1mess/sec.
As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator strategy (implementation given by a factory).
This implementation is based on the DelayQueue from java.util.concurrent.
ex: from("timer://test?period=1000").to("flowregulator://test?period=5000");
/**
* This camel component is able to regulate the flow of a route.
* <p>FlowRegulatorComponent uses a period parameter to trigger updates.
* <p>First input message is send immediately to the output.
* Others messages (if period is not over) are inserted into a <em>time
pipeline</em> implemented with a DelayQueue, waiting for timeout to be send
to output
* <p>If period is over, an incoming message will be send immediately to
output.
*
* As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator implementation given by a factory.
*
* <p>example:
* <code>
* <p>camelContext.addComponent("flowregulator", new
FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
*
<p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
* <p>from("flowregulator://test?period=5000").to...
* </code>
* @author bernard LAURANT
*/
public class FlowRelgulatorComponent extends DefaultComponent {
/**
* the time pipeline
*/
private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints = new
DelayQueue<FlowRelgulatorEndPoint>();
/**
* default period is set to 1s
*/
private long defaultPeriod = 1000;
private MessageAggregatorFactory messageAggregatorFactory;
public FlowRelgulatorComponent() {
super();
}
public FlowRelgulatorComponent(CamelContext camelContext) {
super(camelContext);
}
public FlowRelgulatorComponent(MessageAggregatorFactory
messageAggregatorFactory) {
super();
this.messageAggregatorFactory = messageAggregatorFactory;
}
public FlowRelgulatorComponent(CamelContext camelContext,
MessageAggregatorFactory messageAggregatorFactory) {
super(camelContext);
this.messageAggregatorFactory = messageAggregatorFactory;
}
@SuppressWarnings("unchecked")
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
Long period = (Long)getAndRemoveParameter(parameters, "period",
Long.class);
if (period == null) {
period = defaultPeriod;
}
FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
FlowRelgulatorEndPoint(uri, this, remaining);
flowRelgulatorEndPoint.setPeriod(period);
flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
return flowRelgulatorEndPoint;
}
public void setDefaultPeriod(long defaultPeriod) {
this.defaultPeriod = defaultPeriod;
}
public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) {
synchronized (flowRelgulatorEndPoints) {
if
(!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
}
}
}
@Override
public void start() throws Exception {
super.start();
Thread pipeOut = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
FlowRelgulatorEndPoint
flowRelgulatorEndPoint =
flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
if (flowRelgulatorEndPoint !=
null) {
flowRelgulatorEndPoint.flush();
}
} catch (InterruptedException e) {
}
}
}
});
pipeOut.start();
}
public void setMessageAggregatorFactory(MessageAggregatorFactory
messageAggregatorFactory) {
this.messageAggregatorFactory = messageAggregatorFactory;
}
}
/**
* Endpoint for FlowRegulator.
*
* @see FlowRegulatorComponent
* @author bernard LAURANT
*/
public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
Delayed {
private MessageAggregator messageAggregator;
private ReguledConsumer reguledConsumer;
private String remaining;
private long period;
private long timeOut;
private long lastMessSent;
public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
flowRelgulatorComponent, String remaining) {
super(uri, flowRelgulatorComponent);
this.remaining = remaining;
}
@Override
public String toString() {
return createEndpointUri();
}
@Override
protected String createEndpointUri() {
return new
StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
}
@Override
public long getDelay(TimeUnit unit) {
long millis2PipeOut = -(System.currentTimeMillis() - timeOut);
return unit.convert(millis2PipeOut, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
long thisDelay = getDelay(TimeUnit.MILLISECONDS);
long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
return (thisDelay<anotherDelay ? -1 : (thisDelay==anotherDelay
? 0 : 1));
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (int) (period ^ (period >>> 32));
result = prime * result + ((remaining == null) ? 0 :
remaining.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) obj;
if (remaining == null) {
if (other.remaining != null)
return false;
} else if (!remaining.equals(other.remaining))
return false;
if (period != other.period)
return false;
return true;
}
@Override
public Producer createProducer() throws Exception {
return new DefaultProducer(this) {
@Override
public void process(Exchange exchange) throws Exception
{
messageAggregator.agregateMessage(exchange.getIn());
if (lastMessSent == 0) {
flush();// first msg: send it
immediately
} else {
timeOut = lastMessSent + period; //
timeout to pipeout the msg
if (System.currentTimeMillis() >=
timeOut) {
flush(); // send it
immediately, because timeout has expired
} else {
// put it in the time pipeline
((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
}
}
}
};
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
if (reguledConsumer == null) {
reguledConsumer = new ReguledConsumer(this, processor);
}
return reguledConsumer;
}
public void flush() {
reguledConsumer.send();
lastMessSent = System.currentTimeMillis();
}
class ReguledConsumer extends DefaultConsumer {
public ReguledConsumer(Endpoint endPoint, Processor processor) {
super(endPoint, processor);
}
public void send() {
Exchange exchange = getEndpoint().createExchange();
exchange.setIn(messageAggregator.getAndClearMessage());
try {
getProcessor().process(exchange);
} catch (Exception e) {
handleException(e);
}
}
}
@Override
public boolean isSingleton() {
return true;
}
public void setPeriod(long period) {
this.period = period;
}
public void setAggregator(MessageAggregator messageAggregator) {
this.messageAggregator = messageAggregator;
}
}
The MessageAgregator interface :
public interface MessageAggregatorFactory {
MessageAggregator createMessageAggregator();
}
public interface MessageAggregator {
void agregateMessage(Message message);
void clearMessage();
Message getMessage();
Message getAndClearMessage();
}
Exemple for a Map :
public class MapMessageAggregatorFactory implements MessageAggregatorFactory
{
@Override
public MessageAggregator createMessageAggregator() {
return new MessageAggregator() {
Message data;
@Override
public synchronized Message getMessage() {
return data;
}
@Override
public synchronized void clearMessage() {
data = null;
}
@Override
public synchronized Message getAndClearMessage() {
Message res = getMessage();
clearMessage();
return res;
}
@SuppressWarnings("unchecked")
@Override
public synchronized void agregateMessage(Message
message) {
if (message == null)
return;
if (data == null) {
data = message;
} else {
Map newData = (Map) message.getBody();
((Map)data.getBody()).putAll(newData);
}
}
};
}
}
exemple for a string :
public class StringMessageAggregatorFactory implements
MessageAggregatorFactory {
@Override
public MessageAggregator createMessageAggregator() {
return new MessageAggregator() {
Message data;
@Override
public synchronized Message getMessage() {
return data;
}
@Override
public synchronized void clearMessage() {
data.setBody("");
}
@Override
public synchronized void agregateMessage(Message
message) {
if (data == null) {
data = message;
} else {
data.setBody((String)data.getBody() +
message.getBody());
data.getHeaders().putAll(message.getHeaders());
}
}
@Override
public synchronized Message getAndClearMessage() {
Message res = data;
data = null;
return res;
}
};
}
}
And a simple test :
public class TestFlow {
static CamelContext camelContext;
static int counter = 0;
public static void main(String[] args) throws Exception {
camelContext = new DefaultCamelContext();
camelContext.addComponent("flowregulator", new
FlowRelgulatorComponent(new
StringMessageAggregatorFactory()));
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer://test?period=1000")
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
System.out.println("timer : " +
exchange.getIn().getHeaders());
Message msg = exchange.getIn();
msg.setBody(Integer.toString(counter++));
exchange.setOut(msg);
}
})
.to("flowregulator://test?period=5000");
from("flowregulator://test?period=5000").process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
System.out.println("flowReg: "
+ exchange.getIn() + " " +
exchange.getIn().getHeaders());
}
});
}
});
camelContext.start();
}
}
--
View this message in context:
http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
Sent from the Camel Development mailing list archive at Nabble.com.