Hi Claus Ibsen,

I was able to create the JIRA ticket as well as requested.
Please find the JIRA ticket here -
https://issues.apache.org/jira/browse/CAMEL-21689

Thank you,
Manjunath S Horapeti

On Fri, Jan 31, 2025 at 1:06 AM Manjunath.S Horapeti <
manjunaths.horap...@gmail.com> wrote:

> Hi Claus Ibsen,
>
> Thank you for the response.
>
> I have requested for the JIRA account to create a jira ticket.
> In the meantime, I can probably provide some more insights as to what the
> image had.
> The first image was a snippet of the documentation on the camel site. (
> https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer)
> this link would take you to the same page.
>
> The second and the third images was about the code from the class
> "KafkaRecordBatchingProcessor" from the package
> "org.apache.camel.component.kafka.consumer.support.batching" from jar
> "camel-kafka:4.8.3"
> in the method
> public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer,
> ConsumerRecords<Object, Object> consumerRecords)
> line 123 has the below condition
>
>         if (hasExpiredRecords(consumerRecords)) {
>             LOG.debug(
>                     "The polling timeout has expired with {} records in
> cache. Dispatching the incomplete batch for processing",
>                     exchangeList.size());
>
>             // poll timeout has elapsed, so check for expired records
>             processBatch(camelKafkaConsumer);
>             exchangeList.clear();
>
>             return ProcessingResult.newUnprocessed();
>         }
>
> Below is the definition of method "hasExpiredRecords" in line 152 of the
> same class
>     private boolean hasExpiredRecords(ConsumerRecords<Object, Object>
> consumerRecords) {
>         return !exchangeList.isEmpty() && consumerRecords.isEmpty() &&
> watch.taken() >= configuration.getPollTimeoutMs();
>     }
>
> We believe the above condition is never met in the scenario mentioned in
> the trailing mail chain.
> The topic is producing messages at a rate of 50-100 messages per second
> and because of this "consumerRecords.isEmpty()" condition always returns
> false and hence never checking the pollTimeoutMs.
> Because of this, the route is waiting until the 50000 records are pulled
> from the topic to process further.
>
> I will go ahead and raise the JIRA ticket as requested once the account
> opening request is approved, just wanted to provide a bit of the context
> about the images if that helps.
>
>
> Thank you,
> Manjunath S Horapeti
>
>
> On Thu, Jan 30, 2025 at 2:35 PM Claus Ibsen <claus.ib...@gmail.com> wrote:
>
>> Hi
>>
>> We cannot see the image attached to this email. Maybe you can create a
>> JIRA
>> ticket and attach the screenshots there.
>>
>> On Wed, Jan 29, 2025 at 8:38 PM Manjunath.S Horapeti <
>> manjunaths.horap...@gmail.com> wrote:
>>
>> > Hi Team,
>> >
>> > I'm reaching out regarding the understanding of behaviour of "
>> > pollTimeoutMs" and the discrepancy observed.
>> >
>> > [image: image.png]
>> >
>> > As per the documentation above from the camel kafka component page (
>> > https://camel.apache.org/components/4.8.x/kafka-component.html) 4.8.x
>> > version, the pollTimeoutMs works in tandem with "maxPollRecords", to
>> either
>> > poll "maxPollRecords" or block for a maximum of "pollTimeOutMs".
>> >
>> > But the behaviour observed was that the camel route kept waiting
>> > until "maxPollRecords" count was reached and then processed further.
>> > for example: our route is as follows
>> >
>> >
>> from("kafka:topic-name?brokers=brokers&groupId=groupid&pollTimeoutMs=10000&batching=true&maxPollRecords=50000").bean(this,
>> > "methodName");
>> >
>> > This route always waits until 50000 records are present in the topic and
>> > then processes further and ignoring pollTimeoutMs of 10000 (10 seconds).
>> > i.e. if the producer is producing msgs at a rate of 50-100 msgs per
>> second,
>> > then application waits for nearly 500-1000 seconds before proceeding
>> > further that is until 50000 record count is met.
>> >
>> > We believe that the below code mentioned in the
>> > "KafkaRecordBatchingProcessor" is never executed as there is always one
>> or
>> > two messages in poll. and hence method - "hasExpiredRecords" with
>> condition
>> > consumerRecords.isEmpty() is always false.
>> > We believe this is making the application wait until maxPollRecords
>> > (50000) is reached and then proceed further.
>> > [image: image.png]
>> >
>> > [image: image.png]
>> >
>> > Can you please help us by letting us know if the above behaviour is as
>> > expected and if so then can you let us know how to pull Y messages from
>> the
>> > topic or write whatever messages are received within X seconds? (exact
>> > behaviour mentioned in the above Batching Consumer documentation).
>> >
>> > Your response would be much appreciated.
>> >
>> >
>> > --
>> > Thank you,
>> > Manjunath S Horapeti
>> >
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>>
>

Reply via email to