Hi

Yes, I tried setting blockWhenFull=true. However it seems that the caller
blocks AFTER de-queuing the message from the activeMQ queue. So, if the size
of the seda queue is 4, then I'm having 5 messages being dequeued, the fifth
one probably waiting to get into the seda queue, and might have to wait a
long time. In my situation, I have multiple distributed activeMQ consumers,
each one feeding messages to a SEDA queue for asynchronous processing.
Therefore, if the message is dequeued by a full SEDA queue, it will have to
wait on the full queue and can't be handled by other activeMQ consumers.

I hope I have described my problem well. Please tell me if I'm not clear
enough.

Thanks

-----Original Message-----
From: Claus Ibsen [mailto:claus.ib...@gmail.com] 
Sent: Monday, January 30, 2012 3:03 PM
To: users@camel.apache.org
Subject: Re: Using SEDA without losing messages

Hi

Check the seda documentation.
http://camel.apache.org/seda

There is a blockWhenFull option you can use to set true to have the caller
block if the seda queue is full



On Mon, Jan 30, 2012 at 7:20 AM, Sabin Timalsena
<stimals...@veriskhealth.com> wrote:
> Hello,
>
>
>
> I'm a beginner in camel and ActiveMQ and was recently trying to study 
> the behavior of SEDA queues.
>
> I'm not sure I understand the "size" property of SEDA queues. Say the  
> queue has (size=4) and (concurrentConsumers=4). 4 messages are brought 
> into the queue. My understanding is that, as soon as the processing of 
> those 4 messages is started, 4 more are brought into the SEDA queue. 
> So my assumption was that 8 messages would be dequeued initially from 
> the place this SEDA queue is consuming from. However, in my tests, 9 
> messages were dequeued.
>
>
>
> Here's the setup I used for testing this behavior:
>
>
>
>                                private static final String SEDA_URI = 
> "seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true";
>
>                                ...
>
>
>
>                                from("activemq:start1")
>
>                                .wireTap("direct:wiretap")
>
>                                .to(SEDA_URI);
>
>
>
>                                from(SEDA_URI)
>
>                                .process(new Processor() {
>
>
>
>                                                @Override
>
>                                                public void 
> process(Exchange
> ex) throws Exception {
>
>                                                                Message 
> in = ex.getIn();
>
>
> LOGGER.info("Procesing Message: " + in.getBody());
>
>
>
> Thread.sleep(10000);
>
>                                                }
>
>                                });
>
>
>
>                                from("direct:wiretap")
>
>                                .process(new Processor() {
>
>
>
>                                                @Override
>
>                                                public void 
> process(Exchange
> exchange) throws Exception {
>
>                                                                Message 
> in = exchange.getIn();
>
>
> LOGGER.info("Tapped Message: " + in.getBody());
>
>                                                }
>
>                                });
>
>
>
> The "activemq:start1" has 20 messages initially.
>
> Here's the output obtained just after the test is started:
>
>
>
> [ient) thread #5 - seda://tasks] SEDATests                      INFO 
> Procesing Message: Message 0
>
> [ient) thread #2 - seda://tasks] SEDATests                      INFO 
> Procesing Message: Message 1
>
> [ient) thread #3 - seda://tasks] SEDATests                      INFO 
> Procesing Message: Message 2
>
> [ient) thread #4 - seda://tasks] SEDATests                      INFO 
> Procesing Message: Message 3
>
> [el-client) thread #6 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 0
>
> [el-client) thread #7 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 1
>
> [el-client) thread #8 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 2
>
> [el-client) thread #9 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 3
>
> [l-client) thread #10 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 4
>
> [l-client) thread #11 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 5
>
> [l-client) thread #12 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 6
>
> [l-client) thread #13 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 7
>
> [l-client) thread #14 - WireTap] SEDATests                      INFO  
> Tapped
> Message: Message 8
>
>
>
> When I change the SEDA endpoint URI to have "size=1", 6 messages are 
> dequeued initially. I don't understand why that extra one message is 
> being dequeued. When full, does SEDA block *after* dequeuing the message?
>
>
>
> My objective is to prevent messages from being dequeued from 
> "activemq:start1", if they won't be processed immediately.
>
> Please give me some suggestions
>
>
>
> Thanks in advance...
>



--
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to