Hi

For parallel processing, use the concurrentConsumers |
maxConcurrentConsumers on the JMS endpoint. That is better.


On Mon, Jan 30, 2012 at 12:06 PM, Sabin Timalsena
<stimals...@veriskhealth.com> wrote:
> Well, I wanted to include SEDA for the parallel processing of messages.
> Maybe I should be trying something else? Like the Threads DSL maybe. I
> haven't taken a good look at the Threads DSL though. What would you
> recommend? My scenario basically is like this:
>
> - Messages arrive in an ActiveMQ queue
> - There are several consumers on that queue with different consumer
> priorities, prefetch buffers etc.
> - Messages received be each consumer should be processed parallelly
> - Messages should not be dequeued unless they can be serviced, i.e. if one
> consumer fails to process the message, the others should get that chance
>
> I'd gladly appreciate any suggestions.
>
>
> -----Original Message-----
> From: Claus Ibsen [mailto:claus.ib...@gmail.com]
> Sent: Monday, January 30, 2012 3:42 PM
> To: users@camel.apache.org
> Subject: Re: Using SEDA without losing messages
>
> On Mon, Jan 30, 2012 at 10:48 AM, Sabin Timalsena
> <stimals...@veriskhealth.com> wrote:
>> Hi
>>
>> Yes, I tried setting blockWhenFull=true. However it seems that the
>> caller blocks AFTER de-queuing the message from the activeMQ queue.
>> So, if the size of the seda queue is 4, then I'm having 5 messages
>> being dequeued, the fifth one probably waiting to get into the seda
>> queue, and might have to wait a long time. In my situation, I have
>> multiple distributed activeMQ consumers, each one feeding messages to a
> SEDA queue for asynchronous processing.
>> Therefore, if the message is dequeued by a full SEDA queue, it will
>> have to wait on the full queue and can't be handled by other activeMQ
> consumers.
>>
>
> Yes of course it does, the JMS consumer can pickup the message regardless,
> what the SEDA queue size is.
>
> You would need to use something a like the throttling route policy, to be
> able at runtime to suspend/resume the route.
> So it only is active if the SEDA queue is not full.
>
> Then you can code some custom logic in your route policy, that checks if the
> SEDA queue is full or not. If its full then it suspend the route.
> And when the SEDA queue is no longer full, it resumes the route.
>
> http://camel.apache.org/routepolicy
>>
>> That said, why do you need the SEDA queues at all?
>> If you just process the message straight, then you only pickup new
> messages from the JMS queue, when the previous message has been fully
> processed.
>>
>
>> I hope I have described my problem well. Please tell me if I'm not
>> clear enough.
>>
>> Thanks
>>
>> -----Original Message-----
>> From: Claus Ibsen [mailto:claus.ib...@gmail.com]
>> Sent: Monday, January 30, 2012 3:03 PM
>> To: users@camel.apache.org
>> Subject: Re: Using SEDA without losing messages
>>
>> Hi
>>
>> Check the seda documentation.
>> http://camel.apache.org/seda
>>
>> There is a blockWhenFull option you can use to set true to have the
>> caller block if the seda queue is full
>>
>>
>>
>> On Mon, Jan 30, 2012 at 7:20 AM, Sabin Timalsena
>> <stimals...@veriskhealth.com> wrote:
>>> Hello,
>>>
>>>
>>>
>>> I'm a beginner in camel and ActiveMQ and was recently trying to study
>>> the behavior of SEDA queues.
>>>
>>> I'm not sure I understand the "size" property of SEDA queues. Say the
>>> queue has (size=4) and (concurrentConsumers=4). 4 messages are
>>> brought into the queue. My understanding is that, as soon as the
>>> processing of those 4 messages is started, 4 more are brought into the
> SEDA queue.
>>> So my assumption was that 8 messages would be dequeued initially from
>>> the place this SEDA queue is consuming from. However, in my tests, 9
>>> messages were dequeued.
>>>
>>>
>>>
>>> Here's the setup I used for testing this behavior:
>>>
>>>
>>>
>>>                                private static final String SEDA_URI =
>>> "seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true";
>>>
>>>                                ...
>>>
>>>
>>>
>>>                                from("activemq:start1")
>>>
>>>                                .wireTap("direct:wiretap")
>>>
>>>                                .to(SEDA_URI);
>>>
>>>
>>>
>>>                                from(SEDA_URI)
>>>
>>>                                .process(new Processor() {
>>>
>>>
>>>
>>>                                                @Override
>>>
>>>                                                public void
>>> process(Exchange
>>> ex) throws Exception {
>>>
>>>
>>> Message in = ex.getIn();
>>>
>>>
>>> LOGGER.info("Procesing Message: " + in.getBody());
>>>
>>>
>>>
>>> Thread.sleep(10000);
>>>
>>>                                                }
>>>
>>>                                });
>>>
>>>
>>>
>>>                                from("direct:wiretap")
>>>
>>>                                .process(new Processor() {
>>>
>>>
>>>
>>>                                                @Override
>>>
>>>                                                public void
>>> process(Exchange
>>> exchange) throws Exception {
>>>
>>>
>>> Message in = exchange.getIn();
>>>
>>>
>>> LOGGER.info("Tapped Message: " + in.getBody());
>>>
>>>                                                }
>>>
>>>                                });
>>>
>>>
>>>
>>> The "activemq:start1" has 20 messages initially.
>>>
>>> Here's the output obtained just after the test is started:
>>>
>>>
>>>
>>> [ient) thread #5 - seda://tasks] SEDATests                      INFO
>>> Procesing Message: Message 0
>>>
>>> [ient) thread #2 - seda://tasks] SEDATests                      INFO
>>> Procesing Message: Message 1
>>>
>>> [ient) thread #3 - seda://tasks] SEDATests                      INFO
>>> Procesing Message: Message 2
>>>
>>> [ient) thread #4 - seda://tasks] SEDATests                      INFO
>>> Procesing Message: Message 3
>>>
>>> [el-client) thread #6 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 0
>>>
>>> [el-client) thread #7 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 1
>>>
>>> [el-client) thread #8 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 2
>>>
>>> [el-client) thread #9 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 3
>>>
>>> [l-client) thread #10 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 4
>>>
>>> [l-client) thread #11 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 5
>>>
>>> [l-client) thread #12 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 6
>>>
>>> [l-client) thread #13 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 7
>>>
>>> [l-client) thread #14 - WireTap] SEDATests                      INFO
>>> Tapped
>>> Message: Message 8
>>>
>>>
>>>
>>> When I change the SEDA endpoint URI to have "size=1", 6 messages are
>>> dequeued initially. I don't understand why that extra one message is
>>> being dequeued. When full, does SEDA block *after* dequeuing the message?
>>>
>>>
>>>
>>> My objective is to prevent messages from being dequeued from
>>> "activemq:start1", if they won't be processed immediately.
>>>
>>> Please give me some suggestions
>>>
>>>
>>>
>>> Thanks in advance...
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cib...@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cib...@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to