Hello, According to my use case, - I have to intake the message because I need the data inside it (aggregation is done with map.putAll()) - I have to delay endpoint processing message (or aggregated message) only if current time < time of the last message + period. (regulation) For this purpose, RoutePolicy looks lighter than Component. But I can’t see how I can avoid the use of a time pipeline (like DelayedQueue) ? Suspending/resuming consumer is not enough; I need to resume the consumer at a specific time with a specific message.
Details about use case : Message 1 BID: 12.2 ASK: 12.3 -> send this first message now Next message (if exist) will be sent in 1 period Message 2 TRADEPRICE = 12.3 Message 3 BID: 12.3 ASK: 12.4 Message 4 BID: 12.3 ASK: 12.5 -> 3 messages during the period: send message 2+3+4 (aggregation is done with map.putAll()) TRADEPRICE = 12.3 BID: 12.3 ASK: 12.5 Claus Ibsen-2 wrote: > > Hi > > Thanks for your contribution. And the idea of using DelayedQueue is good. > Its something we will add to Camel error handler in the future to > offer non blocked waiting while waiting to do next redelivery. > > In terms of dynamically throttle a route I believe Camel's RoutePolicy > is maybe more powerful > http://camel.apache.org/routepolicy.html > > It allows you to avoid intaking the messages which then wont risk of > loosing messages if they are temporary stored in an in memory > DelayedQueue. > > > > On Tue, Apr 13, 2010 at 3:45 PM, blau wrote: >> >> >> Hello, this is a little contribution to camel projet. >> >> Sometime you need to regulate (slow down) the flow of a stream. >> For example, imagine a marketdata flow where body of message is a >> Map. Update rate is far too important for your need >> (sometime more thant 20 mess/sec), so you want to regulate it at >> 1mess/sec. >> >> As messages are not delayed but aggregated, this flowregulator need a >> MessageAggregator strategy (implementation given by a factory). >> >> This implementation is based on the DelayQueue from java.util.concurrent. >> >> ex: >> from("timer://test?period=1000").to("flowregulator://test?period=5000"); >> >> /** >> * This camel component is able to regulate the flow of a route. >> * FlowRegulatorComponent uses a period parameter to trigger updates. >> * First input message is send immediately to the output. >> * Others messages (if period is not over) are inserted into a time >> pipeline implemented with a DelayQueue, waiting for timeout to be send >> to output >> * If period is over, an incoming message will be send immediately to >> output. >> * >> * As messages are not delayed but aggregated, this flowregulator need a >> MessageAggregator implementation given by a factory. >> * >> * example: >> * >> * camelContext.addComponent("flowregulator", new >> FlowRelgulatorComponent(new StringMessageAggregatorFactory())); >> * >> from("timer://test?period=1000").to("flowregulator://test?period=5000"); >> * from("flowregulator://test?period=5000").to... >> * >> * @author bernard LAURANT >> */ >> public class FlowRelgulatorComponent extends DefaultComponent { >> >> /** >> * the time pipeline >> */ >> private DelayQueue flowRelgulatorEndPoints = new >> DelayQueue(); >> >> /** >> * default period is set to 1s >> */ >> private long defaultPeriod = 1000; >> >> private MessageAggregatorFactory messageAggregatorFactory; >> >> public FlowRelgulatorComponent() { >> super(); >> } >> >> public FlowRelgulatorComponent(CamelContext camelContext) { >> super(camelContext); >> } >> >> public FlowRelgulatorComponent(MessageAggregatorFactory >> messageAggregatorFactory) { >> super(); >> this.messageAggregatorFactory = messageAggregatorFactory; >> } >> >> public FlowRelgulatorComponent(CamelContext camelContext, >> MessageAggregatorFactory messageAggregatorFactory) { >> super(camelContext); >> this.messageAggregatorFactory = messageAggregatorFactory; >> } >> >> �...@suppresswarnings("unchecked") >> �...@override >> protected Endpoint createEndpoint(String uri, String remaining, Map >> parameters) throws Exception { >> Long period = (Long)getAndRemoveParameter(parameters, >> "period", >> Long.class); >> if (period == null) { >> period = defaultPeriod; >> } >> FlowRelgulatorEndPoint flowRelgulatorEndPoint = new >> FlowRelgulatorEndPoint(uri, this, remaining); >> flowRelgulatorEndPoint.setPeriod(period); >> >> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator()); >> return flowRelgulatorEndPoint; >> } >> >> public void setDefaultPeriod(long defaultPeriod) { >> this.defaultPeriod = defaultPeriod; >> } >> >> public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) >> { >> synchronized (flowRelgulatorEndPoints) { >> if >> (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) { >> >> flowRelgulatorEndPoints.put(flowRelgulatorEndPoint); >> } >> } >> } >> >> �...@override >> public void start() throws Exception { >> super.start(); >> Thread pipeOut = new Thread(new Runnable() { >> �...@override >> public void run() { >> while (true) { >> try { >> FlowRelgulatorEndPoint >> flowRelgulatorEndPoint = >> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS); >> if (flowRelgulatorEndPoint >> != null) { >> >> flowRelgulatorEndPoint.flush(); >> } >> } catch (InterruptedException e) { >> } >> } >> } >> }); >> pipeOut.start(); >> } >> >> public void setMessageAggregatorFactory(MessageAggregatorFactory >> messageAggregatorFactory) { >> this.messageAggregatorFactory = messageAggregatorFactory; >> } >> } >> >> >> >> >> /** >> * Endpoint for FlowRegulator. >> * >> * @see FlowRegulatorComponent >> * @author bernard LAURANT >> */ >> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements >> Delayed { >> >> private MessageAggregator messageAggregator; >> private ReguledConsumer reguledConsumer; >> >> private String remaining; >> private long period; >> private long timeOut; >> private long lastMessSent; >> >> public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent >> flowRelgulatorComponent, String remaining) { >> super(uri, flowRelgulatorComponent); >> this.remaining = remaining; >> } >> >> �...@override >> public String toString() { >> return createEndpointUri(); >> } >> >> �...@override >> protected String createEndpointUri() { >> return new >> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString(); >> } >> >> �...@override >> public long getDelay(TimeUnit unit) { >> long millis2PipeOut = -(System.currentTimeMillis() - >> timeOut); >> return unit.convert(millis2PipeOut, >> TimeUnit.MILLISECONDS); >> } >> >> �...@override >> public int compareTo(Delayed o) { >> long thisDelay = getDelay(TimeUnit.MILLISECONDS); >> long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS); >> return (thisDelay } >> >> �...@override >> public int hashCode() { >> final int prime = 31; >> int result = super.hashCode(); >> result = prime * result + (int) (period ^ (period >>> >> 32)); >> result = prime * result + ((remaining == null) ? 0 : >> remaining.hashCode()); >> return result; >> } >> >> �...@override >> public boolean equals(Object obj) { >> if (this == obj) >> return true; >> if (!super.equals(obj)) >> return false; >> if (getClass() != obj.getClass()) >> return false; >> FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) >> obj; >> if (remaining == null) { >> if (other.remaining != null) >> return false; >> } else if (!remaining.equals(other.remaining)) >> return false; >> if (period != other.period) >> return false; >> return true; >> } >> >> >> �...@override >> public Producer createProducer() throws Exception { >> return new DefaultProducer(this) { >> �...@override >> public void process(Exchange exchange) throws >> Exception { >> >> messageAggregator.agregateMessage(exchange.getIn()); >> if (lastMessSent == 0) { >> flush();// first msg: send it >> immediately >> } else { >> timeOut = lastMessSent + period; >> // timeout to pipeout the msg >> if (System.currentTimeMillis() >= >> timeOut) { >> flush(); // send it >> immediately, because timeout has expired >> } else { >> // put it in the time >> pipeline >> >> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this); >> } >> } >> } >> }; >> } >> >> �...@override >> public Consumer createConsumer(Processor processor) throws >> Exception { >> if (reguledConsumer == null) { >> reguledConsumer = new ReguledConsumer(this, >> processor); >> } >> return reguledConsumer; >> } >> >> public void flush() { >> reguledConsumer.send(); >> lastMessSent = System.currentTimeMillis(); >> } >> >> class ReguledConsumer extends DefaultConsumer { >> >> public ReguledConsumer(Endpoint endPoint, Processor >> processor) { >> super(endPoint, processor); >> } >> >> public void send() { >> Exchange exchange = >> getEndpoint().createExchange(); >> >> exchange.setIn(messageAggregator.getAndClearMessage()); >> try { >> getProcessor().process(exchange); >> } catch (Exception e) { >> handleException(e); >> } >> } >> } >> >> �...@override >> public boolean isSingleton() { >> return true; >> } >> >> public void setPeriod(long period) { >> this.period = period; >> } >> >> public void setAggregator(MessageAggregator messageAggregator) { >> this.messageAggregator = messageAggregator; >> } >> } >> >> >> The MessageAgregator interface : >> >> public interface MessageAggregatorFactory { >> >> MessageAggregator createMessageAggregator(); >> } >> >> public interface MessageAggregator { >> void agregateMessage(Message message); >> void clearMessage(); >> Message getMessage(); >> Message getAndClearMessage(); >> } >> >> Exemple for a Map : >> >> public class MapMessageAggregatorFactory implements >> MessageAggregatorFactory >> { >> >> �...@override >> public MessageAggregator createMessageAggregator() { >> return new MessageAggregator() { >> >> Message data; >> >> �...@override >> public synchronized Message getMessage() { >> return data; >> } >> >> �...@override >> public synchronized void clearMessage() { >> data = null; >> } >> >> �...@override >> public synchronized Message getAndClearMessage() { >> Message res = getMessage(); >> clearMessage(); >> return res; >> } >> >> �...@suppresswarnings("unchecked") >> �...@override >> public synchronized void agregateMessage(Message >> message) { >> if (message == null) >> return; >> if (data == null) { >> data = message; >> } else { >> Map newData = (Map) >> message.getBody(); >> >> ((Map)data.getBody()).putAll(newData); >> } >> } >> }; >> } >> } >> >> exemple for a string : >> >> public class StringMessageAggregatorFactory implements >> MessageAggregatorFactory { >> >> �...@override >> public MessageAggregator createMessageAggregator() { >> return new MessageAggregator() { >> >> Message data; >> >> �...@override >> public synchronized Message getMessage() { >> return data; >> } >> >> �...@override >> public synchronized void clearMessage() { >> data.setBody(""); >> } >> >> �...@override >> public synchronized void agregateMessage(Message >> message) { >> if (data == null) { >> data = message; >> } else { >> >> data.setBody((String)data.getBody() + message.getBody()); >> >> data.getHeaders().putAll(message.getHeaders()); >> } >> } >> >> �...@override >> public synchronized Message getAndClearMessage() { >> Message res = data; >> data = null; >> return res; >> } >> }; >> } >> } >> >> And a simple test : >> >> public class TestFlow { >> >> static CamelContext camelContext; >> static int counter = 0; >> >> public static void main(String[] args) throws Exception { >> camelContext = new DefaultCamelContext(); >> camelContext.addComponent("flowregulator", new >> FlowRelgulatorComponent(new >> StringMessageAggregatorFactory())); >> >> camelContext.addRoutes(new RouteBuilder() { >> �...@override >> public void configure() throws Exception { >> from("timer://test?period=1000") >> .process(new Processor() { >> �...@override >> public void process(Exchange >> exchange) throws Exception { >> System.out.println("timer >> : " + exchange.getIn().getHeaders()); >> Message msg = >> exchange.getIn(); >> >> msg.setBody(Integer.toString(counter++)); >> exchange.setOut(msg); >> } >> }) >> .to("flowregulator://test?period=5000"); >> >> >> from("flowregulator://test?period=5000").process(new Processor() { >> �...@override >> public void process(Exchange >> exchange) throws Exception { >> >> System.out.println("flowReg: " + exchange.getIn() + " " + >> exchange.getIn().getHeaders()); >> } >> }); >> } >> }); >> camelContext.start(); >> } >> >> } >> >> >> >> -- >> View this message in context: >> http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html >> Sent from the Camel Development mailing list archive at Nabble.com. >> >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > > -- View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28243300.html Sent from the Camel Development mailing list archive at Nabble.com.
