Yup, I have tested by adding more than 5 messages and when I started the
route which Arnaud specified, it polled only one message at a time.

On Thu, Feb 3, 2022 at 9:23 AM Chirag <chirag.sangh...@gmail.com> wrote:

> Can you please post or point to a prototype ?
> Have you tried with putting large number if messages ?
>
> On Wed, Feb 2, 2022, 17:35 Narsi Reddy Nallamilli <
> narsi.nallami...@gmail.com> wrote:
>
> > Hi Arnaud,
> >
> > Yup, It used to work before because I use it in my project and was not
> > aware of the bug till you reported.
> >
> > Can you open up a jira for the same?
> >
> > On Thu, 3 Feb, 2022, 01:00 Arnaud Level, <arn...@toro-intl.com> wrote:
> >
> > > Hi again Narsi,
> > >
> > > I read more about the *delay* component and you are right! Using it
> with
> > a
> > > custom (or even the default) thread pool profile we can have the
> expected
> > > behavior without the thread pool being exhausted as I was thinking.
> This
> > is
> > > a workaround to the concurrentConsumer bug not being taken into
> account.
> > > I still think we should open an issue as this behavior is supposed to
> be
> > > supported by the concurrentConsumer URI parameter of the SQS component.
> > > What do you think?
> > >
> > > Thanks for your help again!
> > > Arnaud
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Feb 2, 2022 at 6:42 PM Arnaud Level <arn...@toro-intl.com>
> > wrote:
> > >
> > > > Hi Narsi,
> > > >
> > > > Thank you for the reply! Indeed it works but not exactly as
> expected. I
> > > am
> > > > not sure to understand the behavior because with this delay it is not
> > > > limited to the number of concurrent consumers. If I set
> > > > concurrentConsumers=3 and I send 5 messages (which are read 1 by 1
> > with 5
> > > > pollings) I will see the 5 messages being consumed immediately even
> > > though
> > > > they all are entering in a sleep of 5 secs each.
> > > > The expected behavior would be to see 3 immediately and then the
> next 2
> > > > when two of the first 3 wakes up (=after the 5 s sleep).
> > > >
> > > > From a behavior perspective, it looks like a new thread is created
> > every
> > > > time a message is available on the queue but without any limit. If
> this
> > > is
> > > > the case we can easily end up with an exhausted thread pool. I had a
> > > quick
> > > > look at the delay component and it is by default async which would
> > > explain
> > > > the behavior we are seeing. What do you think ?
> > > >
> > > > Regarding the concurrentConsumers parameter, if this is a bug, I
> > believe
> > > > it is a regression because according to this article
> > > > <
> > >
> >
> https://blog.christianposta.com/camel/very-fast-camels-and-cloud-messaging/
> > >
> > > it
> > > > has worked in the past. Shouldn't we open an issue ?
> > > >
> > > > Thanks again!
> > > > Arnaud
> > > >
> > > > On Wed, Feb 2, 2022 at 5:52 PM Narsi Reddy Nallamilli <
> > > > narsi.nallami...@gmail.com> wrote:
> > > >
> > > >> Hi Arnaud,
> > > >>
> > > >> Yes, you are correct, 'concurrentConsumers; uri attribute is not
> > working
> > > >> as
> > > >> expected and this is some bug.
> > > >>
> > > >> However I found when you use delay in the route it works as
> expected.
> > > >>
> > > >>
> > > >>
> > >
> >
> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
> > > >> waitTimeSeconds=20")
> > > >> .delay(1L)
> > > >>                 .process(exchange -> {
> > > >>                     System.out.println("Message received...");
> > > >>                     })
> > > >>                 .process(exchange -> {
> > > >>                     try {
> > > >>                         Thread.sleep(5000);
> > > >>                     } catch (InterruptedException e) {
> > > >>                         e.printStackTrace();
> > > >>                     }});
> > > >>
> > > >> try it and let me know.
> > > >>
> > > >> On Mon, Jan 31, 2022 at 11:48 PM Arnaud Level <arn...@toro-intl.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi Larry,
> > > >> >
> > > >> > Thank you for your message! Your reply makes sense to me and I've
> > > tried
> > > >> > what you suggested to test it with a queue like:
> > > >> >
> > > >> >
> > > >>
> > >
> >
> from("aws2-sqs://aramark-notifications?maxMessagesPerPoll=1&concurrentConsumers=5&amazonSQSClient=#sqsClient")
> > > >> > but it does not change, I still see the messages with a delay of 5
> > > >> seconds
> > > >> > between each of them.
> > > >> >
> > > >> > If I turn on the trace with
> > > >> > (logging.level.org.apache.camel.component.aws2.sqs=TRACE). I see
> > that
> > > >> the
> > > >> > next polling is only triggered after
> > > >> > the Delete message of a consumed message is sent. And in the logs
> I
> > > see
> > > >> "1
> > > >> > message received" but no polling will occur until the Delete
> message
> > > is
> > > >> > sent which occurs only after the sleep of 5 secs. Only 1 message
> is
> > > read
> > > >> > per polling and I should see at least 5 of them since
> > > >> > concurrentConsumers=5.
> > > >> > It looks to me there is still only 1 concurrent consumer and I
> don't
> > > >> > understand why.
> > > >> >
> > > >> > Arnaud
> > > >> >
> > > >> > On Mon, Jan 31, 2022 at 3:27 PM Larry Shields <
> > > larry.shie...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi Arnaud,
> > > >> > >
> > > >> > > I think what may be happening is that you first consumer is
> > grabbing
> > > >> all
> > > >> > of
> > > >> > > the messages from the queue that are available.  The default
> > message
> > > >> poll
> > > >> > > size is unlimited so your other 4 consumers are polling for
> > nothing
> > > on
> > > >> > the
> > > >> > > queue.  What you might want to try is setting the
> > maxMessagesPerPoll
> > > >> to a
> > > >> > > value.  This will reduce the number of messages a consumer
> > retrieves
> > > >> for
> > > >> > > each poll.
> > > >> > >
> > > >> > > The other thing I have done is set Greedy=true when using
> > concurrent
> > > >> > > consumers if you're looking to increase throughput.  This will
> > cause
> > > >> the
> > > >> > > consumer to make another poll immediately without waiting for
> the
> > > next
> > > >> > > delay if the previous poll returned at least 1 message.
> > > >> > >
> > > >> > > Example:
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> aws-sqs://my-queue?greedy=true&maxMessagesPerPoll=10&concurrentConsumers=5
> > > >> > >
> > > >> > > So in this example, each consumer will pull no more than 10
> > messages
> > > >> per
> > > >> > > poll to the SQS Queue.  So if there are less than 10 messages on
> > > your
> > > >> > > queue, the other consumers aren't going to really get an
> messages.
> > > >> > >
> > > >> > > On Mon, Jan 31, 2022 at 7:15 AM Arnaud Level <
> > arn...@toro-intl.com>
> > > >> > wrote:
> > > >> > >
> > > >> > > > Hi Narsi,
> > > >> > > > It's a fifo but I actually tried the simple code I've posted
> on
> > a
> > > >> > > standard
> > > >> > > > one too and observed the same behavior.
> > > >> > > >
> > > >> > > > On Mon, Jan 31, 2022 at 1:11 PM Narsi Reddy Nallamilli <
> > > >> > > > narsi.nallami...@gmail.com> wrote:
> > > >> > > >
> > > >> > > > > Hi Arnaud,
> > > >> > > > >
> > > >> > > > > Is your AWS queue type fifo or standard?
> > > >> > > > >
> > > >> > > > > On Mon, 31 Jan, 2022, 17:31 Arnaud Level, <
> > arn...@toro-intl.com
> > > >
> > > >> > > wrote:
> > > >> > > > >
> > > >> > > > > > Hi,
> > > >> > > > > >
> > > >> > > > > > (Camel version: camel-aws2-sqs-starter: 3.12.0)
> > > >> > > > > >
> > > >> > > > > > I am trying to use and understand concurentConsumers with
> a
> > > SQS
> > > >> > > queue:
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
> > > >> > > > > > waitTimeSeconds=20")
> > > >> > > > > >                 .process(exchange -> {
> > > >> > > > > >                     System.out.println("Message
> > received...");
> > > >> > > > > >                     })
> > > >> > > > > >                 .process(exchange -> {
> > > >> > > > > >                     try {
> > > >> > > > > >                         Thread.sleep(5000);
> > > >> > > > > >                     } catch (InterruptedException e) {
> > > >> > > > > >                         e.printStackTrace();
> > > >> > > > > >                     }});
> > > >> > > > > >
> > > >> > > > > > With the above queue, if I send 3 messages at the same
> > time, I
> > > >> have
> > > >> > > to
> > > >> > > > > wait
> > > >> > > > > > 5 seconds to see the second message ("Message
> received...")
> > > and
> > > >> 5
> > > >> > > more
> > > >> > > > > > seconds to see the third one. My understanding of
> > > >> > concurentConsumers
> > > >> > > is
> > > >> > > > > > that with a value of 5 I would see the 3 messages at the
> > same
> > > >> time
> > > >> > > > since
> > > >> > > > > 3
> > > >> > > > > > threads will consume them in parallel. If I add the
> > > Thread.sleep
> > > >> > in a
> > > >> > > > > seda
> > > >> > > > > > route, I'm having this behavior (= The 3 messages are read
> > at
> > > >> the
> > > >> > > same
> > > >> > > > > > time).
> > > >> > > > > >
> > > >> > > > > > Turning on the Camel logs it seems that the next polling
> is
> > > done
> > > >> > only
> > > >> > > > > after
> > > >> > > > > > the Delete for the previous message is sent (which is
> with a
> > > >> delay
> > > >> > of
> > > >> > > > > 5s).
> > > >> > > > > >
> > > >> > > > > > I would understand the above behavior with
> > > concurentConsumers=1
> > > >> > but I
> > > >> > > > > don't
> > > >> > > > > > with concurentConsumers=5. Could someone tell me what I've
> > > >> > > > misunderstood
> > > >> > > > > ?
> > > >> > > > > >
> > > >> > > > > > Thank you in advance!
> > > >> > > > > > Arnaud
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> > --
> > > >> >
> > > >>
> > > >>
> > >
> >
>

Reply via email to