Hello,

 

I'm a beginner in camel and ActiveMQ and was recently trying to study the
behavior of SEDA queues. 

I'm not sure I understand the "size" property of SEDA queues. Say the  queue
has (size=4) and (concurrentConsumers=4). 4 messages are brought into the
queue. My understanding is that, as soon as the processing of those 4
messages is started, 4 more are brought into the SEDA queue. So my
assumption was that 8 messages would be dequeued initially from the place
this SEDA queue is consuming from. However, in my tests, 9 messages were
dequeued.

 

Here's the setup I used for testing this behavior:

 

                                private static final String SEDA_URI =
"seda:tasks?size=4&concurrentConsumers=4&blockWhenFull=true";

                                ...

                                

                                from("activemq:start1")

                                .wireTap("direct:wiretap")

                                .to(SEDA_URI);

                                

                                from(SEDA_URI)

                                .process(new Processor() {

                                                

                                                @Override

                                                public void process(Exchange
ex) throws Exception {

                                                                Message in =
ex.getIn();

 
LOGGER.info("Procesing Message: " + in.getBody());


 
Thread.sleep(10000);

                                                }

                                });

                                

                                from("direct:wiretap")

                                .process(new Processor() {

                                                

                                                @Override

                                                public void process(Exchange
exchange) throws Exception {

                                                                Message in =
exchange.getIn();

 
LOGGER.info("Tapped Message: " + in.getBody());

                                                }

                                });

 

The "activemq:start1" has 20 messages initially.

Here's the output obtained just after the test is started:

 

[ient) thread #5 - seda://tasks] SEDATests                      INFO
Procesing Message: Message 0

[ient) thread #2 - seda://tasks] SEDATests                      INFO
Procesing Message: Message 1

[ient) thread #3 - seda://tasks] SEDATests                      INFO
Procesing Message: Message 2

[ient) thread #4 - seda://tasks] SEDATests                      INFO
Procesing Message: Message 3

[el-client) thread #6 - WireTap] SEDATests                      INFO  Tapped
Message: Message 0

[el-client) thread #7 - WireTap] SEDATests                      INFO  Tapped
Message: Message 1

[el-client) thread #8 - WireTap] SEDATests                      INFO  Tapped
Message: Message 2

[el-client) thread #9 - WireTap] SEDATests                      INFO  Tapped
Message: Message 3

[l-client) thread #10 - WireTap] SEDATests                      INFO  Tapped
Message: Message 4

[l-client) thread #11 - WireTap] SEDATests                      INFO  Tapped
Message: Message 5

[l-client) thread #12 - WireTap] SEDATests                      INFO  Tapped
Message: Message 6

[l-client) thread #13 - WireTap] SEDATests                      INFO  Tapped
Message: Message 7

[l-client) thread #14 - WireTap] SEDATests                      INFO  Tapped
Message: Message 8

 

When I change the SEDA endpoint URI to have "size=1", 6 messages are
dequeued initially. I don't understand why that extra one message is being
dequeued. When full, does SEDA block *after* dequeuing the message?

 

My objective is to prevent messages from being dequeued from
"activemq:start1", if they won't be processed immediately.

Please give me some suggestions

 

Thanks in advance...

Reply via email to