Hi For parallel processing, use the concurrentConsumers | maxConcurrentConsumers on the JMS endpoint. That is better.
On Mon, Jan 30, 2012 at 12:06 PM, Sabin Timalsena <stimals...@veriskhealth.com> wrote: > Well, I wanted to include SEDA for the parallel processing of messages. > Maybe I should be trying something else? Like the Threads DSL maybe. I > haven't taken a good look at the Threads DSL though. What would you > recommend? My scenario basically is like this: > > - Messages arrive in an ActiveMQ queue > - There are several consumers on that queue with different consumer > priorities, prefetch buffers etc. > - Messages received be each consumer should be processed parallelly > - Messages should not be dequeued unless they can be serviced, i.e. if one > consumer fails to process the message, the others should get that chance > > I'd gladly appreciate any suggestions. > > > -----Original Message----- > From: Claus Ibsen [mailto:claus.ib...@gmail.com] > Sent: Monday, January 30, 2012 3:42 PM > To: users@camel.apache.org > Subject: Re: Using SEDA without losing messages > > On Mon, Jan 30, 2012 at 10:48 AM, Sabin Timalsena > <stimals...@veriskhealth.com> wrote: >> Hi >> >> Yes, I tried setting blockWhenFull=true. However it seems that the >> caller blocks AFTER de-queuing the message from the activeMQ queue. >> So, if the size of the seda queue is 4, then I'm having 5 messages >> being dequeued, the fifth one probably waiting to get into the seda >> queue, and might have to wait a long time. In my situation, I have >> multiple distributed activeMQ consumers, each one feeding messages to a > SEDA queue for asynchronous processing. >> Therefore, if the message is dequeued by a full SEDA queue, it will >> have to wait on the full queue and can't be handled by other activeMQ > consumers. >> > > Yes of course it does, the JMS consumer can pickup the message regardless, > what the SEDA queue size is. > > You would need to use something a like the throttling route policy, to be > able at runtime to suspend/resume the route. > So it only is active if the SEDA queue is not full. > > Then you can code some custom logic in your route policy, that checks if the > SEDA queue is full or not. If its full then it suspend the route. > And when the SEDA queue is no longer full, it resumes the route. > > http://camel.apache.org/routepolicy >> >> That said, why do you need the SEDA queues at all? >> If you just process the message straight, then you only pickup new > messages from the JMS queue, when the previous message has been fully > processed. >> > >> I hope I have described my problem well. Please tell me if I'm not >> clear enough. >> >> Thanks >> >> -----Original Message----- >> From: Claus Ibsen [mailto:claus.ib...@gmail.com] >> Sent: Monday, January 30, 2012 3:03 PM >> To: users@camel.apache.org >> Subject: Re: Using SEDA without losing messages >> >> Hi >> >> Check the seda documentation. >> http://camel.apache.org/seda >> >> There is a blockWhenFull option you can use to set true to have the >> caller block if the seda queue is full >> >> >> >> On Mon, Jan 30, 2012 at 7:20 AM, Sabin Timalsena >> <stimals...@veriskhealth.com> wrote: >>> Hello, >>> >>> >>> >>> I'm a beginner in camel and ActiveMQ and was recently trying to study >>> the behavior of SEDA queues. >>> >>> I'm not sure I understand the "size" property of SEDA queues. Say the >>> queue has (size=4) and (concurrentConsumers=4). 4 messages are >>> brought into the queue. My understanding is that, as soon as the >>> processing of those 4 messages is started, 4 more are brought into the > SEDA queue. >>> So my assumption was that 8 messages would be dequeued initially from >>> the place this SEDA queue is consuming from. However, in my tests, 9 >>> messages were dequeued. >>> >>> >>> >>> Here's the setup I used for testing this behavior: >>> >>> >>> >>> private static final String SEDA_URI = >>> "seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true"; >>> >>> ... >>> >>> >>> >>> from("activemq:start1") >>> >>> .wireTap("direct:wiretap") >>> >>> .to(SEDA_URI); >>> >>> >>> >>> from(SEDA_URI) >>> >>> .process(new Processor() { >>> >>> >>> >>> @Override >>> >>> public void >>> process(Exchange >>> ex) throws Exception { >>> >>> >>> Message in = ex.getIn(); >>> >>> >>> LOGGER.info("Procesing Message: " + in.getBody()); >>> >>> >>> >>> Thread.sleep(10000); >>> >>> } >>> >>> }); >>> >>> >>> >>> from("direct:wiretap") >>> >>> .process(new Processor() { >>> >>> >>> >>> @Override >>> >>> public void >>> process(Exchange >>> exchange) throws Exception { >>> >>> >>> Message in = exchange.getIn(); >>> >>> >>> LOGGER.info("Tapped Message: " + in.getBody()); >>> >>> } >>> >>> }); >>> >>> >>> >>> The "activemq:start1" has 20 messages initially. >>> >>> Here's the output obtained just after the test is started: >>> >>> >>> >>> [ient) thread #5 - seda://tasks] SEDATests INFO >>> Procesing Message: Message 0 >>> >>> [ient) thread #2 - seda://tasks] SEDATests INFO >>> Procesing Message: Message 1 >>> >>> [ient) thread #3 - seda://tasks] SEDATests INFO >>> Procesing Message: Message 2 >>> >>> [ient) thread #4 - seda://tasks] SEDATests INFO >>> Procesing Message: Message 3 >>> >>> [el-client) thread #6 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 0 >>> >>> [el-client) thread #7 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 1 >>> >>> [el-client) thread #8 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 2 >>> >>> [el-client) thread #9 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 3 >>> >>> [l-client) thread #10 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 4 >>> >>> [l-client) thread #11 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 5 >>> >>> [l-client) thread #12 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 6 >>> >>> [l-client) thread #13 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 7 >>> >>> [l-client) thread #14 - WireTap] SEDATests INFO >>> Tapped >>> Message: Message 8 >>> >>> >>> >>> When I change the SEDA endpoint URI to have "size=1", 6 messages are >>> dequeued initially. I don't understand why that extra one message is >>> being dequeued. When full, does SEDA block *after* dequeuing the message? >>> >>> >>> >>> My objective is to prevent messages from being dequeued from >>> "activemq:start1", if they won't be processed immediately. >>> >>> Please give me some suggestions >>> >>> >>> >>> Thanks in advance... >>> >> >> >> >> -- >> Claus Ibsen >> ----------------- >> FuseSource >> Email: cib...@fusesource.com >> Web: http://fusesource.com >> Twitter: davsclaus, fusenews >> Blog: http://davsclaus.blogspot.com/ >> Author of Camel in Action: http://www.manning.com/ibsen/ >> > > > > -- > Claus Ibsen > ----------------- > FuseSource > Email: cib...@fusesource.com > Web: http://fusesource.com > Twitter: davsclaus, fusenews > Blog: http://davsclaus.blogspot.com/ > Author of Camel in Action: http://www.manning.com/ibsen/ > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/