Hi Arnaud,

I think what may be happening is that you first consumer is grabbing all of
the messages from the queue that are available.  The default message poll
size is unlimited so your other 4 consumers are polling for nothing on the
queue.  What you might want to try is setting the maxMessagesPerPoll to a
value.  This will reduce the number of messages a consumer retrieves for
each poll.

The other thing I have done is set Greedy=true when using concurrent
consumers if you're looking to increase throughput.  This will cause the
consumer to make another poll immediately without waiting for the next
delay if the previous poll returned at least 1 message.

Example:

aws-sqs://my-queue?greedy=true&maxMessagesPerPoll=10&concurrentConsumers=5

So in this example, each consumer will pull no more than 10 messages per
poll to the SQS Queue.  So if there are less than 10 messages on your
queue, the other consumers aren't going to really get an messages.

On Mon, Jan 31, 2022 at 7:15 AM Arnaud Level <arn...@toro-intl.com> wrote:

> Hi Narsi,
> It's a fifo but I actually tried the simple code I've posted on a standard
> one too and observed the same behavior.
>
> On Mon, Jan 31, 2022 at 1:11 PM Narsi Reddy Nallamilli <
> narsi.nallami...@gmail.com> wrote:
>
> > Hi Arnaud,
> >
> > Is your AWS queue type fifo or standard?
> >
> > On Mon, 31 Jan, 2022, 17:31 Arnaud Level, <arn...@toro-intl.com> wrote:
> >
> > > Hi,
> > >
> > > (Camel version: camel-aws2-sqs-starter: 3.12.0)
> > >
> > > I am trying to use and understand concurentConsumers with a SQS queue:
> > >
> > >
> >
> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
> > > waitTimeSeconds=20")
> > >                 .process(exchange -> {
> > >                     System.out.println("Message received...");
> > >                     })
> > >                 .process(exchange -> {
> > >                     try {
> > >                         Thread.sleep(5000);
> > >                     } catch (InterruptedException e) {
> > >                         e.printStackTrace();
> > >                     }});
> > >
> > > With the above queue, if I send 3 messages at the same time, I have to
> > wait
> > > 5 seconds to see the second message ("Message received...") and 5 more
> > > seconds to see the third one. My understanding of concurentConsumers is
> > > that with a value of 5 I would see the 3 messages at the same time
> since
> > 3
> > > threads will consume them in parallel. If I add the Thread.sleep in a
> > seda
> > > route, I'm having this behavior (= The 3 messages are read at the same
> > > time).
> > >
> > > Turning on the Camel logs it seems that the next polling is done only
> > after
> > > the Delete for the previous message is sent (which is with a delay of
> > 5s).
> > >
> > > I would understand the above behavior with concurentConsumers=1 but I
> > don't
> > > with concurentConsumers=5. Could someone tell me what I've
> misunderstood
> > ?
> > >
> > > Thank you in advance!
> > > Arnaud
> > >
> >
>

Reply via email to