This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new b42f5a4fcb6 CAMEL-18350: backport the fix for Kafka "breakOnFirst" 
error (#8451)
b42f5a4fcb6 is described below

commit b42f5a4fcb6ea7e72fb7e597c1a26365c6cb8fa2
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Thu Sep 29 17:40:31 2022 +0200

    CAMEL-18350: backport the fix for Kafka "breakOnFirst" error (#8451)
---
 .../apache/camel/component/kafka/KafkaFetchRecords.java | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index db425700532..f0f9413880a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -156,8 +156,7 @@ class KafkaFetchRecords implements Runnable {
     }
 
     protected void startPolling() {
-        long partitionLastOffset = -1;
-
+        KafkaRecordProcessor.ProcessResult lastResult = null;
         try {
             /*
              * We lock the processing of the record to avoid raising a 
WakeUpException as a result to a call
@@ -179,7 +178,7 @@ class KafkaFetchRecords implements Runnable {
 
                 processAsyncCommits();
 
-                partitionLastOffset = processPolledRecords(allRecords, 
kafkaRecordProcessor);
+                lastResult = processPolledRecords(allRecords, 
kafkaRecordProcessor, lastResult);
             }
 
             if (!isConnected()) {
@@ -213,7 +212,7 @@ class KafkaFetchRecords implements Runnable {
                         e.getClass().getName(), threadId, getPrintableTopic(), 
lastProcessedOffset, e.getMessage());
             }
 
-            handleAccordingToStrategy(partitionLastOffset, e);
+            handleAccordingToStrategy(lastResult.getPartitionLastOffset(), e);
         } finally {
             lock.unlock();
 
@@ -338,16 +337,18 @@ class KafkaFetchRecords implements Runnable {
         return kafkaConsumer.getEndpoint().getCamelContext().isStopping() && 
!kafkaConsumer.isRunAllowed();
     }
 
-    private long processPolledRecords(ConsumerRecords<Object, Object> 
allRecords, KafkaRecordProcessor kafkaRecordProcessor) {
+    private KafkaRecordProcessor.ProcessResult processPolledRecords(
+            ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor 
kafkaRecordProcessor,
+            KafkaRecordProcessor.ProcessResult resultFromPreviousPoll) {
         logRecords(allRecords);
 
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
-        KafkaRecordProcessor.ProcessResult lastResult = 
KafkaRecordProcessor.ProcessResult.newUnprocessed();
+        KafkaRecordProcessor.ProcessResult lastResult
+                = resultFromPreviousPoll == null ? 
KafkaRecordProcessor.ProcessResult.newUnprocessed() : resultFromPreviousPoll;
 
         while (partitionIterator.hasNext() && !isStopping()) {
-            lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
             TopicPartition partition = partitionIterator.next();
 
             List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords.records(partition);
@@ -377,7 +378,7 @@ class KafkaFetchRecords implements Runnable {
             setRetry(false); // to close the current consumer
         }
 
-        return lastResult.getPartitionLastOffset();
+        return lastResult;
     }
 
     private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> 
partitionRecords, TopicPartition partition) {

Reply via email to