This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 7b082074390 CAMEL-18350: fix bug causing all messages to be reconsumed 
(#8447)
7b082074390 is described below

commit 7b0820743900553727c6eb7abc2610bdaf40f7fa
Author: klease <38634989+kle...@users.noreply.github.com>
AuthorDate: Thu Sep 29 09:47:19 2022 +0200

    CAMEL-18350: fix bug causing all messages to be reconsumed (#8447)
    
    When "breakOnFirstError" = "true", if an error occurs on the first message
    in a set of polled messages, then the offset is set to -1, causing
    all messages to be refetched and not only the one which failed.
    Modify KafkaFetchRecords to remember the result returned from
    the the processing of the previous batch when processing a new batch
    of polled messages.
    
    (cherry picked from commit 853477a2d0e38dcbc0d399baa3c3e4a56c973b42)
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java   | 5 ++++-
 .../kafka/consumer/support/KafkaRecordProcessorFacade.java         | 7 ++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 8cab5bcd76a..5684375bbdd 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -307,6 +307,7 @@ public class KafkaFetchRecords implements Runnable {
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+            ProcessingResult lastResult = null;
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && 
pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -315,13 +316,15 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
-                ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
+                ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords, lastResult);
 
                 if (result.isBreakOnErrorHit()) {
                     LOG.debug("We hit an error ... setting flags to force 
reconnect");
                     // force re-connect
                     setReconnect(true);
                     setConnected(false);
+                } else {
+                    lastResult = result;
                 }
 
                 updateTaskState();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 53e519523e7..fbf6f3d09a8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,16 +54,17 @@ public class KafkaRecordProcessorFacade {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(ConsumerRecords<Object, 
Object> allRecords) {
+    public ProcessingResult processPolledRecords(
+            ConsumerRecords<Object, Object> allRecords, ProcessingResult 
resultFromPreviousPoll) {
         logRecords(allRecords);
 
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
-        ProcessingResult lastResult = ProcessingResult.newUnprocessed();
+        ProcessingResult lastResult
+                = resultFromPreviousPoll == null ? 
ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
 
         while (partitionIterator.hasNext() && !isStopping()) {
-            lastResult = ProcessingResult.newUnprocessed();
             TopicPartition partition = partitionIterator.next();
 
             List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords.records(partition);

Reply via email to