This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push:
new 0b7654b0841 fix(KafkaRecordBatchingProcessor): improve handling of
expired records and batch processing (#18343)
0b7654b0841 is described below
commit 0b7654b08410e377ebe754da250f9441789c31ef
Author: Adithya Kashyap H M <[email protected]>
AuthorDate: Thu Jun 12 11:19:45 2025 +0530
fix(KafkaRecordBatchingProcessor): improve handling of expired records and
batch processing (#18343)
Co-authored-by: Adithya Kashyap H M <[email protected]>
---
.../consumer/support/batching/KafkaRecordBatchingProcessor.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 09247cddc39..ccf4b5478c2 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -110,6 +110,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
timeoutWatch.takenAndRestart();
}
+ // If timeout has expired, process current batch but continue to
handle new records
if (hasExpiredRecords(consumerRecords)) {
LOG.debug(
"The polling timeout has expired with {} records in cache.
Dispatching the incomplete batch for processing",
@@ -118,9 +119,10 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
// poll timeout has elapsed, so check for expired records
processBatch(camelKafkaConsumer);
exchangeList.clear();
- return ProcessingResult.newUnprocessed();
+ timeoutWatch.takenAndRestart(); // restart timer after processing
expired batch
}
+ // Always add new records after handling any expiration
for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
TopicPartition tp = new TopicPartition(consumerRecord.topic(),
consumerRecord.partition());
Exchange childExchange = toExchange(camelKafkaConsumer, tp,
consumerRecord);
@@ -130,6 +132,7 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
if (exchangeList.size() >= configuration.getMaxPollRecords()) {
processBatch(camelKafkaConsumer);
exchangeList.clear();
+ timeoutWatch.takenAndRestart(); // restart timer after batch
processed
}
}