Hi Claus Ibsen, I was able to create the JIRA ticket as well as requested. Please find the JIRA ticket here - https://issues.apache.org/jira/browse/CAMEL-21689
Thank you, Manjunath S Horapeti On Fri, Jan 31, 2025 at 1:06 AM Manjunath.S Horapeti < manjunaths.horap...@gmail.com> wrote: > Hi Claus Ibsen, > > Thank you for the response. > > I have requested for the JIRA account to create a jira ticket. > In the meantime, I can probably provide some more insights as to what the > image had. > The first image was a snippet of the documentation on the camel site. ( > https://camel.apache.org/components/4.8.x/kafka-component.html#_batching_consumer) > this link would take you to the same page. > > The second and the third images was about the code from the class > "KafkaRecordBatchingProcessor" from the package > "org.apache.camel.component.kafka.consumer.support.batching" from jar > "camel-kafka:4.8.3" > in the method > public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, > ConsumerRecords<Object, Object> consumerRecords) > line 123 has the below condition > > if (hasExpiredRecords(consumerRecords)) { > LOG.debug( > "The polling timeout has expired with {} records in > cache. Dispatching the incomplete batch for processing", > exchangeList.size()); > > // poll timeout has elapsed, so check for expired records > processBatch(camelKafkaConsumer); > exchangeList.clear(); > > return ProcessingResult.newUnprocessed(); > } > > Below is the definition of method "hasExpiredRecords" in line 152 of the > same class > private boolean hasExpiredRecords(ConsumerRecords<Object, Object> > consumerRecords) { > return !exchangeList.isEmpty() && consumerRecords.isEmpty() && > watch.taken() >= configuration.getPollTimeoutMs(); > } > > We believe the above condition is never met in the scenario mentioned in > the trailing mail chain. > The topic is producing messages at a rate of 50-100 messages per second > and because of this "consumerRecords.isEmpty()" condition always returns > false and hence never checking the pollTimeoutMs. > Because of this, the route is waiting until the 50000 records are pulled > from the topic to process further. > > I will go ahead and raise the JIRA ticket as requested once the account > opening request is approved, just wanted to provide a bit of the context > about the images if that helps. > > > Thank you, > Manjunath S Horapeti > > > On Thu, Jan 30, 2025 at 2:35 PM Claus Ibsen <claus.ib...@gmail.com> wrote: > >> Hi >> >> We cannot see the image attached to this email. Maybe you can create a >> JIRA >> ticket and attach the screenshots there. >> >> On Wed, Jan 29, 2025 at 8:38 PM Manjunath.S Horapeti < >> manjunaths.horap...@gmail.com> wrote: >> >> > Hi Team, >> > >> > I'm reaching out regarding the understanding of behaviour of " >> > pollTimeoutMs" and the discrepancy observed. >> > >> > [image: image.png] >> > >> > As per the documentation above from the camel kafka component page ( >> > https://camel.apache.org/components/4.8.x/kafka-component.html) 4.8.x >> > version, the pollTimeoutMs works in tandem with "maxPollRecords", to >> either >> > poll "maxPollRecords" or block for a maximum of "pollTimeOutMs". >> > >> > But the behaviour observed was that the camel route kept waiting >> > until "maxPollRecords" count was reached and then processed further. >> > for example: our route is as follows >> > >> > >> from("kafka:topic-name?brokers=brokers&groupId=groupid&pollTimeoutMs=10000&batching=true&maxPollRecords=50000").bean(this, >> > "methodName"); >> > >> > This route always waits until 50000 records are present in the topic and >> > then processes further and ignoring pollTimeoutMs of 10000 (10 seconds). >> > i.e. if the producer is producing msgs at a rate of 50-100 msgs per >> second, >> > then application waits for nearly 500-1000 seconds before proceeding >> > further that is until 50000 record count is met. >> > >> > We believe that the below code mentioned in the >> > "KafkaRecordBatchingProcessor" is never executed as there is always one >> or >> > two messages in poll. and hence method - "hasExpiredRecords" with >> condition >> > consumerRecords.isEmpty() is always false. >> > We believe this is making the application wait until maxPollRecords >> > (50000) is reached and then proceed further. >> > [image: image.png] >> > >> > [image: image.png] >> > >> > Can you please help us by letting us know if the above behaviour is as >> > expected and if so then can you let us know how to pull Y messages from >> the >> > topic or write whatever messages are received within X seconds? (exact >> > behaviour mentioned in the above Batching Consumer documentation). >> > >> > Your response would be much appreciated. >> > >> > >> > -- >> > Thank you, >> > Manjunath S Horapeti >> > >> >> >> -- >> Claus Ibsen >> ----------------- >> @davsclaus >> Camel in Action 2: https://www.manning.com/ibsen2 >> >