Hello, this is a little contribution to camel projet.

Sometime you need to regulate (slow down) the flow of a stream.
For example, imagine a marketdata flow where body of message is a
Map<String, Object>. Update rate is far too important for your need
(sometime more thant 20 mess/sec), so you want to regulate it at 1mess/sec.

As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator strategy (implementation given by a factory).

This implementation is based on the DelayQueue from java.util.concurrent.

ex: from("timer://test?period=1000").to("flowregulator://test?period=5000");

/**
 * This camel component is able to regulate the flow of a route.
 * <p>FlowRegulatorComponent uses a period parameter to trigger updates.
 * <p>First input message is send immediately to the output.
 * Others messages (if period is not over) are inserted into a <em>time
pipeline</em> implemented with a DelayQueue, waiting for timeout to be send
to output
 * <p>If period is over, an incoming message will be send immediately to
output.   
 * 
 * As messages are not delayed but aggregated, this flowregulator need a
MessageAggregator implementation given by a factory. 
 * 
 * <p>example:
 * <code>
 * <p>camelContext.addComponent("flowregulator", new
FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
 *
<p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
 * <p>from("flowregulator://test?period=5000").to...
 * </code>
 * @author bernard LAURANT
 */
public class FlowRelgulatorComponent extends DefaultComponent {

        /**
         * the time pipeline
         */
        private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints = new
DelayQueue<FlowRelgulatorEndPoint>();
        
        /**
         * default period is set to 1s
         */
        private long defaultPeriod = 1000;
        
        private MessageAggregatorFactory messageAggregatorFactory;
        
        public FlowRelgulatorComponent() {
                super();
        }

        public FlowRelgulatorComponent(CamelContext camelContext) {
                super(camelContext);
        }

        public FlowRelgulatorComponent(MessageAggregatorFactory
messageAggregatorFactory) {
                super();
                this.messageAggregatorFactory = messageAggregatorFactory;
        }
        
        public FlowRelgulatorComponent(CamelContext camelContext,
MessageAggregatorFactory messageAggregatorFactory) {
                super(camelContext);
                this.messageAggregatorFactory = messageAggregatorFactory;
        }
        
        @SuppressWarnings("unchecked")
        @Override
    protected Endpoint createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
                Long period = (Long)getAndRemoveParameter(parameters, "period",
Long.class);
                if (period == null) {
                        period = defaultPeriod;
                }
                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
FlowRelgulatorEndPoint(uri, this, remaining);
                flowRelgulatorEndPoint.setPeriod(period);
        
flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
                return flowRelgulatorEndPoint;
    }

        public void setDefaultPeriod(long defaultPeriod) {
                this.defaultPeriod = defaultPeriod;
        }

        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) {
                synchronized (flowRelgulatorEndPoints) {
                        if 
(!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint)) {
                                
flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
                        }
                }
        }
        
        @Override
        public void start() throws Exception {
                super.start();
                Thread pipeOut = new Thread(new Runnable() {
                        @Override
                        public void run() {
                                while (true) {
                                        try {
                                                FlowRelgulatorEndPoint 
flowRelgulatorEndPoint =
flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
                                                if (flowRelgulatorEndPoint != 
null) {
                                                        
flowRelgulatorEndPoint.flush();
                                                }
                                        } catch (InterruptedException e) {
                                        }
                                }
                        }
                });
                pipeOut.start();
        }
        
        public void setMessageAggregatorFactory(MessageAggregatorFactory
messageAggregatorFactory) {
                this.messageAggregatorFactory = messageAggregatorFactory;
        }
}




/**
 * Endpoint for FlowRegulator.
 * 
 * @see FlowRegulatorComponent
 * @author bernard LAURANT
 */
public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
Delayed {

        private MessageAggregator messageAggregator;
        private ReguledConsumer reguledConsumer;
        
        private String remaining;
        private long period;
        private long timeOut;
        private long lastMessSent;
        
        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
flowRelgulatorComponent, String remaining) {
                super(uri, flowRelgulatorComponent);
                this.remaining = remaining;
        }

        @Override
        public String toString() {
                return createEndpointUri();
        }
        
        @Override
        protected String createEndpointUri() {
                return new
StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
                long millis2PipeOut = -(System.currentTimeMillis() - timeOut);  
                return unit.convert(millis2PipeOut, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
                return (thisDelay<anotherDelay ? -1 : (thisDelay==anotherDelay 
? 0 : 1));
        }
        
        @Override
        public int hashCode() {
                final int prime = 31;
                int result = super.hashCode();
                result = prime * result + (int) (period ^ (period >>> 32));
                result = prime * result + ((remaining == null) ? 0 :
remaining.hashCode());
                return result;
        }

        @Override
        public boolean equals(Object obj) {
                if (this == obj)
                        return true;
                if (!super.equals(obj))
                        return false;
                if (getClass() != obj.getClass())
                        return false;
                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) obj;
                if (remaining == null) {
                        if (other.remaining != null)
                                return false;
                } else if (!remaining.equals(other.remaining))
                        return false;
                if (period != other.period)
                        return false;
                return true;
        }


        @Override
        public Producer createProducer() throws Exception {
                return new DefaultProducer(this) {
                        @Override
                        public void process(Exchange exchange) throws Exception 
{
                                
messageAggregator.agregateMessage(exchange.getIn());
                                if (lastMessSent == 0) {
                                        flush();// first msg: send it 
immediately
                                } else {
                                        timeOut = lastMessSent + period; // 
timeout to pipeout the msg
                                        if (System.currentTimeMillis() >= 
timeOut) {
                                                flush(); // send it 
immediately, because timeout has expired
                                        } else {
                                                // put it in the time pipeline
                                        
((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
      
                                        }
                                }
                        }
                };
        }

        @Override
        public Consumer createConsumer(Processor processor) throws Exception {
                if (reguledConsumer == null) {
                        reguledConsumer = new ReguledConsumer(this, processor);
                }
                return reguledConsumer; 
        }
        
        public void flush() {
                reguledConsumer.send();
                lastMessSent = System.currentTimeMillis();
        }
        
        class ReguledConsumer extends DefaultConsumer {

                public ReguledConsumer(Endpoint endPoint, Processor processor) {
                        super(endPoint, processor);
                }

                public void send() {
                        Exchange exchange = getEndpoint().createExchange();
                        exchange.setIn(messageAggregator.getAndClearMessage());
                        try {
                                getProcessor().process(exchange);
                        } catch (Exception e) {
                                handleException(e);
                        }
                }
        }

        @Override
        public boolean isSingleton() {
                return true;
        }
        
        public void setPeriod(long period) {
                this.period = period;
        }
        
        public void setAggregator(MessageAggregator messageAggregator) {
                this.messageAggregator = messageAggregator;
        }
}


The MessageAgregator interface :

public interface MessageAggregatorFactory {

        MessageAggregator createMessageAggregator();
}

public interface MessageAggregator {
        void agregateMessage(Message message);
        void clearMessage();
        Message getMessage();
        Message getAndClearMessage();
}

Exemple for a Map :

public class MapMessageAggregatorFactory implements MessageAggregatorFactory
{

        @Override
        public MessageAggregator createMessageAggregator() {
                return new MessageAggregator() {
                        
                        Message data;
                        
                        @Override
                        public synchronized Message getMessage() {
                                return data;
                        }
                        
                        @Override
                        public synchronized void clearMessage() {
                                data = null;
                        }
                        
                        @Override
                        public synchronized Message getAndClearMessage() {
                                Message res = getMessage();
                                clearMessage();
                                return res;
                        }
                        
                        @SuppressWarnings("unchecked")
                        @Override
                        public synchronized void agregateMessage(Message 
message) {
                                if (message == null)
                                        return;
                                if (data == null) {
                                        data = message;
                                } else {
                                        Map newData = (Map) message.getBody();
                                        ((Map)data.getBody()).putAll(newData);
                                }
                        }
                };
        }
}

exemple for a string :

public class StringMessageAggregatorFactory implements
MessageAggregatorFactory {

        @Override
        public MessageAggregator createMessageAggregator() {
                return new MessageAggregator() {
                        
                        Message data;
                        
                        @Override
                        public synchronized Message getMessage() {
                                return data;
                        }
                        
                        @Override
                        public synchronized void clearMessage() {
                                data.setBody("");
                        }
                        
                        @Override
                        public synchronized void agregateMessage(Message 
message) {
                                if (data == null) {
                                        data = message;
                                } else {
                                        data.setBody((String)data.getBody() + 
message.getBody());       
                                        
data.getHeaders().putAll(message.getHeaders());
                                }
                        }

                        @Override
                        public synchronized Message getAndClearMessage() {
                                Message res = data;
                                data = null;
                                return res;
                        }
                };
        }
}

And a simple test :

public class TestFlow {

        static CamelContext camelContext; 
        static int counter = 0;
        
        public static void main(String[] args) throws Exception {
                camelContext = new DefaultCamelContext();
                camelContext.addComponent("flowregulator", new 
FlowRelgulatorComponent(new
StringMessageAggregatorFactory()));

                camelContext.addRoutes(new RouteBuilder() {
                        @Override
                        public void configure() throws Exception {
                                from("timer://test?period=1000")
                                .process(new Processor() {
                                        @Override
                                        public void process(Exchange exchange) 
throws Exception {
                                                System.out.println("timer : " + 
exchange.getIn().getHeaders());
                                                Message msg = exchange.getIn();
                                                
msg.setBody(Integer.toString(counter++));
                                                exchange.setOut(msg);
                                        }
                                })
                                .to("flowregulator://test?period=5000");
                
                                
from("flowregulator://test?period=5000").process(new Processor() {
                                        @Override
                                        public void process(Exchange exchange) 
throws Exception {
                                                System.out.println("flowReg: " 
+ exchange.getIn() + " " +
exchange.getIn().getHeaders());         
                                        }
                                });
                        }
                });
                camelContext.start();
        }

}



-- 
View this message in context: 
http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
Sent from the Camel Development mailing list archive at Nabble.com.

Reply via email to