Hello,

According to my use case,
- I have to intake the message because I need the data inside it
(aggregation is done with map.putAll())
- I have to delay endpoint processing message (or aggregated message) only
if current time < time of the last message + period. (regulation)
 
For this purpose, RoutePolicy looks lighter than Component. But I can’t see
how I can avoid the use of a time pipeline (like DelayedQueue) ?
Suspending/resuming consumer is not enough; I need to resume the consumer at
a specific time with a specific message.


Details about use case :

Message 1
BID: 12.2
ASK:  12.3

-> send this first message now
Next message (if exist) will be sent in 1 period

Message 2
TRADEPRICE = 12.3

Message 3
BID: 12.3
ASK: 12.4

Message 4
BID: 12.3
ASK: 12.5

-> 3 messages during the period: send message 2+3+4 (aggregation is done
with map.putAll())
TRADEPRICE = 12.3
BID: 12.3
ASK: 12.5



Claus Ibsen-2 wrote:
> 
> Hi
> 
> Thanks for your contribution. And the idea of using DelayedQueue is good.
> Its something we will add to Camel error handler in the future to
> offer non blocked waiting while waiting to do next redelivery.
> 
> In terms of dynamically throttle a route I believe Camel's RoutePolicy
> is maybe more powerful
> http://camel.apache.org/routepolicy.html
> 
> It allows you to avoid intaking the messages which then wont risk of
> loosing messages if they are temporary stored in an in memory
> DelayedQueue.
> 
> 
> 
> On Tue, Apr 13, 2010 at 3:45 PM, blau  wrote:
>>
>>
>> Hello, this is a little contribution to camel projet.
>>
>> Sometime you need to regulate (slow down) the flow of a stream.
>> For example, imagine a marketdata flow where body of message is a
>> Map. Update rate is far too important for your need
>> (sometime more thant 20 mess/sec), so you want to regulate it at
>> 1mess/sec.
>>
>> As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator strategy (implementation given by a factory).
>>
>> This implementation is based on the DelayQueue from java.util.concurrent.
>>
>> ex:
>> from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>
>> /**
>>  * This camel component is able to regulate the flow of a route.
>>  * 
FlowRegulatorComponent uses a period parameter to trigger updates.
>>  * 
First input message is send immediately to the output.
>>  * Others messages (if period is not over) are inserted into a time
>> pipeline implemented with a DelayQueue, waiting for timeout to be send
>> to output
>>  * 
If period is over, an incoming message will be send immediately to
>> output.
>>  *
>>  * As messages are not delayed but aggregated, this flowregulator need a
>> MessageAggregator implementation given by a factory.
>>  *
>>  * 
example:
>>  * 
>>  * 
camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
>>  *
>> 
from("timer://test?period=1000").to("flowregulator://test?period=5000");
>>  * 
from("flowregulator://test?period=5000").to...
>>  * 
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorComponent extends DefaultComponent {
>>
>>        /**
>>         * the time pipeline
>>         */
>>        private DelayQueue flowRelgulatorEndPoints = new
>> DelayQueue();
>>
>>        /**
>>         * default period is set to 1s
>>         */
>>        private long defaultPeriod = 1000;
>>
>>        private MessageAggregatorFactory messageAggregatorFactory;
>>
>>        public FlowRelgulatorComponent() {
>>                super();
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext) {
>>                super(camelContext);
>>        }
>>
>>        public FlowRelgulatorComponent(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                super();
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>        public FlowRelgulatorComponent(CamelContext camelContext,
>> MessageAggregatorFactory messageAggregatorFactory) {
>>                super(camelContext);
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>>
>>       �...@suppresswarnings("unchecked")
>>       �...@override
>>    protected Endpoint createEndpoint(String uri, String remaining, Map
>> parameters) throws Exception {
>>                Long period = (Long)getAndRemoveParameter(parameters,
>> "period",
>> Long.class);
>>                if (period == null) {
>>                        period = defaultPeriod;
>>                }
>>                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
>> FlowRelgulatorEndPoint(uri, this, remaining);
>>                flowRelgulatorEndPoint.setPeriod(period);
>>
>> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
>>                return flowRelgulatorEndPoint;
>>    }
>>
>>        public void setDefaultPeriod(long defaultPeriod) {
>>                this.defaultPeriod = defaultPeriod;
>>        }
>>
>>        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint)
>> {
>>                synchronized (flowRelgulatorEndPoints) {
>>                        if
>> (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
>>                              
>>  flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
>>                        }
>>                }
>>        }
>>
>>       �...@override
>>        public void start() throws Exception {
>>                super.start();
>>                Thread pipeOut = new Thread(new Runnable() {
>>                       �...@override
>>                        public void run() {
>>                                while (true) {
>>                                        try {
>>                                                FlowRelgulatorEndPoint
>> flowRelgulatorEndPoint =
>> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
>>                                                if (flowRelgulatorEndPoint
>> != null) {
>>                                                      
>>  flowRelgulatorEndPoint.flush();
>>                                                }
>>                                        } catch (InterruptedException e) {
>>                                        }
>>                                }
>>                        }
>>                });
>>                pipeOut.start();
>>        }
>>
>>        public void setMessageAggregatorFactory(MessageAggregatorFactory
>> messageAggregatorFactory) {
>>                this.messageAggregatorFactory = messageAggregatorFactory;
>>        }
>> }
>>
>>
>>
>>
>> /**
>>  * Endpoint for FlowRegulator.
>>  *
>>  * @see FlowRegulatorComponent
>>  * @author bernard LAURANT
>>  */
>> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
>> Delayed {
>>
>>        private MessageAggregator messageAggregator;
>>        private ReguledConsumer reguledConsumer;
>>
>>        private String remaining;
>>        private long period;
>>        private long timeOut;
>>        private long lastMessSent;
>>
>>        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
>> flowRelgulatorComponent, String remaining) {
>>                super(uri, flowRelgulatorComponent);
>>                this.remaining = remaining;
>>        }
>>
>>       �...@override
>>        public String toString() {
>>                return createEndpointUri();
>>        }
>>
>>       �...@override
>>        protected String createEndpointUri() {
>>                return new
>> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
>>        }
>>
>>       �...@override
>>        public long getDelay(TimeUnit unit) {
>>                long millis2PipeOut = -(System.currentTimeMillis() -
>> timeOut);
>>                return unit.convert(millis2PipeOut,
>> TimeUnit.MILLISECONDS);
>>        }
>>
>>       �...@override
>>        public int compareTo(Delayed o) {
>>                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
>>                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
>>                return (thisDelay        }
>>
>>       �...@override
>>        public int hashCode() {
>>                final int prime = 31;
>>                int result = super.hashCode();
>>                result = prime * result + (int) (period ^ (period >>>
>> 32));
>>                result = prime * result + ((remaining == null) ? 0 :
>> remaining.hashCode());
>>                return result;
>>        }
>>
>>       �...@override
>>        public boolean equals(Object obj) {
>>                if (this == obj)
>>                        return true;
>>                if (!super.equals(obj))
>>                        return false;
>>                if (getClass() != obj.getClass())
>>                        return false;
>>                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint)
>> obj;
>>                if (remaining == null) {
>>                        if (other.remaining != null)
>>                                return false;
>>                } else if (!remaining.equals(other.remaining))
>>                        return false;
>>                if (period != other.period)
>>                        return false;
>>                return true;
>>        }
>>
>>
>>       �...@override
>>        public Producer createProducer() throws Exception {
>>                return new DefaultProducer(this) {
>>                       �...@override
>>                        public void process(Exchange exchange) throws
>> Exception {
>>                              
>>  messageAggregator.agregateMessage(exchange.getIn());
>>                                if (lastMessSent == 0) {
>>                                        flush();// first msg: send it
>> immediately
>>                                } else {
>>                                        timeOut = lastMessSent + period;
>> // timeout to pipeout the msg
>>                                        if (System.currentTimeMillis() >=
>> timeOut) {
>>                                                flush(); // send it
>> immediately, because timeout has expired
>>                                        } else {
>>                                                // put it in the time
>> pipeline
>>
>> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
>>                                        }
>>                                }
>>                        }
>>                };
>>        }
>>
>>       �...@override
>>        public Consumer createConsumer(Processor processor) throws
>> Exception {
>>                if (reguledConsumer == null) {
>>                        reguledConsumer = new ReguledConsumer(this,
>> processor);
>>                }
>>                return reguledConsumer;
>>        }
>>
>>        public void flush() {
>>                reguledConsumer.send();
>>                lastMessSent = System.currentTimeMillis();
>>        }
>>
>>        class ReguledConsumer extends DefaultConsumer {
>>
>>                public ReguledConsumer(Endpoint endPoint, Processor
>> processor) {
>>                        super(endPoint, processor);
>>                }
>>
>>                public void send() {
>>                        Exchange exchange =
>> getEndpoint().createExchange();
>>                      
>>  exchange.setIn(messageAggregator.getAndClearMessage());
>>                        try {
>>                                getProcessor().process(exchange);
>>                        } catch (Exception e) {
>>                                handleException(e);
>>                        }
>>                }
>>        }
>>
>>       �...@override
>>        public boolean isSingleton() {
>>                return true;
>>        }
>>
>>        public void setPeriod(long period) {
>>                this.period = period;
>>        }
>>
>>        public void setAggregator(MessageAggregator messageAggregator) {
>>                this.messageAggregator = messageAggregator;
>>        }
>> }
>>
>>
>> The MessageAgregator interface :
>>
>> public interface MessageAggregatorFactory {
>>
>>        MessageAggregator createMessageAggregator();
>> }
>>
>> public interface MessageAggregator {
>>        void agregateMessage(Message message);
>>        void clearMessage();
>>        Message getMessage();
>>        Message getAndClearMessage();
>> }
>>
>> Exemple for a Map :
>>
>> public class MapMessageAggregatorFactory implements
>> MessageAggregatorFactory
>> {
>>
>>       �...@override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                       �...@override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                       �...@override
>>                        public synchronized void clearMessage() {
>>                                data = null;
>>                        }
>>
>>                       �...@override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = getMessage();
>>                                clearMessage();
>>                                return res;
>>                        }
>>
>>                       �...@suppresswarnings("unchecked")
>>                       �...@override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (message == null)
>>                                        return;
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                        Map newData = (Map)
>> message.getBody();
>>                                      
>>  ((Map)data.getBody()).putAll(newData);
>>                                }
>>                        }
>>                };
>>        }
>> }
>>
>> exemple for a string :
>>
>> public class StringMessageAggregatorFactory implements
>> MessageAggregatorFactory {
>>
>>       �...@override
>>        public MessageAggregator createMessageAggregator() {
>>                return new MessageAggregator() {
>>
>>                        Message data;
>>
>>                       �...@override
>>                        public synchronized Message getMessage() {
>>                                return data;
>>                        }
>>
>>                       �...@override
>>                        public synchronized void clearMessage() {
>>                                data.setBody("");
>>                        }
>>
>>                       �...@override
>>                        public synchronized void agregateMessage(Message
>> message) {
>>                                if (data == null) {
>>                                        data = message;
>>                                } else {
>>                                      
>>  data.setBody((String)data.getBody() + message.getBody());
>>                                      
>>  data.getHeaders().putAll(message.getHeaders());
>>                                }
>>                        }
>>
>>                       �...@override
>>                        public synchronized Message getAndClearMessage() {
>>                                Message res = data;
>>                                data = null;
>>                                return res;
>>                        }
>>                };
>>        }
>> }
>>
>> And a simple test :
>>
>> public class TestFlow {
>>
>>        static CamelContext camelContext;
>>        static int counter = 0;
>>
>>        public static void main(String[] args) throws Exception {
>>                camelContext = new DefaultCamelContext();
>>                camelContext.addComponent("flowregulator", new
>> FlowRelgulatorComponent(new
>> StringMessageAggregatorFactory()));
>>
>>                camelContext.addRoutes(new RouteBuilder() {
>>                       �...@override
>>                        public void configure() throws Exception {
>>                                from("timer://test?period=1000")
>>                                .process(new Processor() {
>>                                       �...@override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                                System.out.println("timer
>> : " + exchange.getIn().getHeaders());
>>                                                Message msg =
>> exchange.getIn();
>>                                              
>>  msg.setBody(Integer.toString(counter++));
>>                                                exchange.setOut(msg);
>>                                        }
>>                                })
>>                                .to("flowregulator://test?period=5000");
>>
>>                              
>>  from("flowregulator://test?period=5000").process(new Processor() {
>>                                       �...@override
>>                                        public void process(Exchange
>> exchange) throws Exception {
>>                                              
>>  System.out.println("flowReg: " + exchange.getIn() + " " +
>> exchange.getIn().getHeaders());
>>                                        }
>>                                });
>>                        }
>>                });
>>                camelContext.start();
>>        }
>>
>> }
>>
>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: 
http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28243300.html
Sent from the Camel Development mailing list archive at Nabble.com.

Reply via email to