This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new b494de6956c CAMEL-20044: fix to handle commit on breakOnFirstError 
(#11959)
b494de6956c is described below

commit b494de6956ca31121531759b9fca4b1c76efdc26
Author: Mike Barlotta <codesm...@users.noreply.github.com>
AuthorDate: Wed Nov 15 12:08:28 2023 -0500

    CAMEL-20044: fix to handle commit on breakOnFirstError (#11959)
    
    * fix to handle commit on breakOnFirstError
    * add asynch commit manager test
    * update breakOnFirstError JavaDoc in KafkaConfig
    * update Kafka Component doc
    * fixed checkstyle issues
    * added support for creating topics with multiple partitions
    * add 2 unit tests needing partitions
    * fix comment in KafkaRecordProcessor
    * change topic names
    * force partitions in test
    * fix var names
    * test for breakOnFirstError false and NOOP
    
    ---------
    
    Co-authored-by: Otavio Rodolfo Piske <angusyo...@gmail.com>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc |  46 ++++-
 .../camel/component/kafka/KafkaConfiguration.java  |  13 +-
 .../camel/component/kafka/KafkaFetchRecords.java   |   6 +-
 .../consumer/support/KafkaRecordProcessor.java     |  15 +-
 .../support/KafkaRecordProcessorFacade.java        |   9 +-
 ...eakOnFirstErrorOffUsingKafkaManualCommitIT.java | 162 ++++++++++++++++++
 .../KafkaBreakOnFirstErrorReplayOldMessagesIT.java | 190 +++++++++++++++++++++
 .../KafkaBreakOnFirstErrorSeekIssueIT.java         | 171 +++++++++++++++++++
 ...stErrorWithBatchUsingAsynchCommitManagerIT.java | 155 +++++++++++++++++
 ...irstErrorWithBatchUsingKafkaManualCommitIT.java | 172 +++++++++++++++++++
 ...rrorWithBatchUsingKafkaManualCommitRetryIT.java | 176 +++++++++++++++++++
 ...rstErrorWithBatchUsingSynchCommitManagerIT.java | 155 +++++++++++++++++
 .../component/kafka/testutil/CamelKafkaUtil.java   |  57 +++++++
 .../ROOT/pages/camel-3x-upgrade-guide-3_21.adoc    |  16 ++
 14 files changed, 1326 insertions(+), 17 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index f1ff83cf551..0c6423c810a 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -85,6 +85,24 @@ Camel components have.
 For advanced control a custom implementation of 
`org.apache.camel.component.kafka.PollExceptionStrategy` can be configured
 on the component level, which allows to control which exceptions causes which 
of the strategies above.
 
+== Consumer error handling (advanced)
+
+By default Camel will poll using the *ERROR_HANDLER* to process exceptions. 
+How Camel handles a message that results in an exception can be altered using 
the `breakOnFirstError` attribute in the configuration.
+Instead of continuing to poll next message, Camel will instead commit the 
offset so that the message that caused the exception will be retried.
+This is similar to the *RETRY* polling strategy above. 
+
+[source,java]
+----
+KafkaComponent kafka = new KafkaComponent();
+kafka.setBreakOnFirstError(true);
+...
+camelContext.addComponent("kafka", kafka);
+----
+
+It is recommended that you read the section below "Using manual commit with 
Kafka consumer" to understand how `breakOnFirstError`
+will work based on the `CommitManager` that is configured.
+
 == Samples
 
 === Consuming messages from Kafka
@@ -351,12 +369,15 @@ or on the endpoint, for example:
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
+kafka.setAutoCommitEnable(false);
 kafka.setAllowManualCommit(true);
 ...
 camelContext.addComponent("kafka", kafka);
 ----
 
-Then you can use the `KafkaManualCommit` from Java code such as a Camel 
`Processor`:
+By default this will use the `NoopCommitManager` behind the scenes. In order 
to commit an offset you will 
+required you to use the `KafkaManualCommit` from Java code such as a Camel 
`Processor`:
+
 [source,java]
 ----
 public void process(Exchange exchange) {
@@ -366,7 +387,7 @@ public void process(Exchange exchange) {
 }
 ----
 
-This will force a synchronous commit which will block until the commit is 
acknowledged on Kafka, or if it fails an exception is thrown.
+The `KafkaManualCommit` will force a synchronous commit which will block until 
the commit is acknowledged on Kafka, or if it fails an exception is thrown.
 You can use an asynchronous commit as well by configuring the 
`KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` 
implementation.
 
 Then the commit will be done in the next consumer loop using the kafka 
asynchronous commit api.
@@ -374,6 +395,27 @@ Then the commit will be done in the next consumer loop 
using the kafka asynchron
 If you want to use a custom implementation of `KafkaManualCommit` then you can 
configure a custom `KafkaManualCommitFactory`
 on the `KafkaComponent` that creates instances of your custom implementation.
 
+When configuring a consumer to use manual commit and a specific 
`CommitManager` it is important to understand how these influence the behavior 
+of `breakOnFirstError`
+
+[source,java]
+----
+KafkaComponent kafka = new KafkaComponent();
+kafka.setAutoCommitEnable(false);
+kafka.setAllowManualCommit(true);
+kafka.setBreakOnFirstError(true);
+kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory());
+...
+camelContext.addComponent("kafka", kafka);
+----
+
+When the `CommitManager` is left to the default `NoopCommitManager` then 
`breakOnFirstError` will not automatically commit the offset so that the 
+message with an error is retried. The consumer must manage that in the route 
using `KafkaManualCommit`.
+
+
+When the `CommitManager` is changed to either the synch or asynch manager then 
`breakOnFirstError` will automatically commit the offset so that the 
+message with an error is retried. This message will be continually retried 
until it can be processed without an error.
+
 *Note 1*: records from a partition must be processed and committed by the same 
thread as the consumer. This means that certain EIPs, async or concurrent 
operations
 in the DSL, may cause the commit to fail. In such circumstances, tyring to 
commit the transaction will cause the Kafka client to throw a 
`java.util.ConcurrentModificationException`
 exception with the message `KafkaConsumer is not safe for multi-threaded 
access`. To prevent this from happening, redesign your route to avoid those 
operations.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 38ee58318a8..cd279679980 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -819,10 +819,15 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     /**
      * This options controls what happens when a consumer is processing an 
exchange and it fails. If the option is
      * <tt>false</tt> then the consumer continues to the next message and 
processes it. If the option is <tt>true</tt>
-     * then the consumer breaks out, and will seek back to offset of the 
message that caused a failure, and then
-     * re-attempt to process this message. However this can lead to endless 
processing of the same message if its bound
-     * to fail every time, eg a poison message. Therefore its recommended to 
deal with that for example by using Camel's
-     * error handler.
+     * then the consumer breaks out.
+     * 
+     * Using the default NoopCommitManager will cause the consumer to not 
commit the offset so that the message is
+     * re-attempted. The consumer should use the KafkaManualCommit to 
determine the best way to handle the message.
+     * 
+     * Using either the SynchCommitManager or the AsynchCommitManager the 
consumer will seek back to the offset of the
+     * message that caused a failure, and then re-attempt to process this 
message. However this can lead to endless
+     * processing of the same message if its bound to fail every time, eg a 
poison message. Therefore its recommended to
+     * deal with that for example by using Camel's error handler.
      */
     public void setBreakOnFirstError(boolean breakOnFirstError) {
         this.breakOnFirstError = breakOnFirstError;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index a213dbc1fe8..e13f7d86e99 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -332,7 +332,6 @@ public class KafkaFetchRecords implements Runnable {
                         LOG.trace("This polling iteration is using lastresult 
on partition {} and offset {}",
                                 lastResult.getPartition(), 
lastResult.getPartitionLastOffset());
                     }
-
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("This polling iteration is using lastresult 
of null");
@@ -346,7 +345,6 @@ public class KafkaFetchRecords implements Runnable {
                         LOG.trace("This polling iteration had a result 
returned for partition {} and offset {}",
                                 result.getPartition(), 
result.getPartitionLastOffset());
                     }
-
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("This polling iteration had a result 
returned as null");
@@ -354,7 +352,7 @@ public class KafkaFetchRecords implements Runnable {
                 }
 
                 updateTaskState();
-                if (result.isBreakOnErrorHit() && 
!this.state.equals(State.PAUSED)) {
+                if (result != null && result.isBreakOnErrorHit() && 
!this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force 
reconnect");
                     // force re-connect
                     setReconnect(true);
@@ -363,7 +361,7 @@ public class KafkaFetchRecords implements Runnable {
                     lastResult = result;
 
                     if (LOG.isTraceEnabled()) {
-                        LOG.trace("setting lastresult to partition {} and 
offset {}",
+                        LOG.trace("Setting lastresult to partition {} and 
offset {}",
                                 lastResult.getPartition(), 
lastResult.getPartitionLastOffset());
                     }
                 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 97875b097f1..e731e4c4763 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -63,7 +63,7 @@ public class KafkaRecordProcessor {
             message.setHeader(KafkaConstants.KEY, record.key());
         }
 
-        LOG.debug("setting up the exchange for message from partition {} and 
offset {}",
+        LOG.debug("Setting up the exchange for message from partition {} and 
offset {}",
                 record.partition(), record.offset());
 
         message.setBody(record.value());
@@ -115,13 +115,11 @@ public class KafkaRecordProcessor {
             exchange.setException(e);
         }
         if (exchange.getException() != null) {
-
             LOG.debug("An exception was thrown for record at partition {} and 
offset {}",
                     record.partition(), record.offset());
 
             boolean breakOnErrorExit = processException(exchange, 
topicPartition, record, lastResult,
                     exceptionHandler);
-
             return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
         } else {
             return new ProcessingResult(false, record.partition(), 
record.offset(), exchange.getException() != null);
@@ -135,7 +133,6 @@ public class KafkaRecordProcessor {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
-
             if (lastResult.getPartition() != -1 &&
                     lastResult.getPartition() != record.partition()) {
                 LOG.error("About to process an exception with UNEXPECTED 
partition & offset. Got topic partition {}. " +
@@ -150,13 +147,19 @@ public class KafkaRecordProcessor {
                 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
                         exc.getMessage());
                 LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-                        lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+                        record.offset(), record.partition());
             }
 
             // force commit, so we resume on next poll where we failed 
             // except when the failure happened at the first message in a poll
             if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-                commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+                // we should just do a commit (vs the original forceCommit)
+                // when route uses NOOP Commit Manager it will rely
+                // on the route implementation to explicitly commit offset
+                // when route uses Synch/Asynch Commit Manager it will 
+                // ALWAYS commit the offset for the failing record
+                // and will ALWAYS retry it
+                commitManager.commit(topicPartition);
             }
 
             // continue to next partition
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 134246891fb..44573daa60d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -61,12 +61,16 @@ public class KafkaRecordProcessorFacade {
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
+        LOG.debug("Poll received records on {} partitions", partitions.size());
+
         ProcessingResult lastResult
                 = resultFromPreviousPoll == null ? 
ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
 
         while (partitionIterator.hasNext() && !isStopping()) {
             TopicPartition partition = partitionIterator.next();
 
+            LOG.debug("Processing records on partition {}", 
partition.partition());
+
             List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords.records(partition);
             Iterator<ConsumerRecord<Object, Object>> recordIterator = 
partitionRecords.iterator();
 
@@ -75,10 +79,13 @@ public class KafkaRecordProcessorFacade {
             while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() 
&& !isStopping()) {
                 ConsumerRecord<Object, Object> record = recordIterator.next();
 
+                LOG.debug("Processing record on partition {} with offset {}", 
record.partition(), record.offset());
+
                 lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
                         kafkaRecordProcessor, record);
 
-                LOG.debug("Processed record on partition {} and offset {} and 
got result for partition {} and offset {}",
+                LOG.debug(
+                        "Processed record on partition {} with offset {} and 
got ProcessingResult for partition {} and offset {}",
                         record.partition(), record.offset(), 
lastResult.getPartition(), lastResult.getPartitionLastOffset());
 
                 if (consumerListener != null) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
new file mode 100644
index 00000000000..cd0d48e2394
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * this will test basic breakOnFirstError functionality when it is turned off 
and consumer uses allowManualCommit and
+ * KafkaManualCommit and NOOP Commit Manager
+ */
+class KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
+    public static final String ROUTE_ID = "breakOnFirstErrorOff";
+    public static final String TOPIC = "breakOnFirstErrorOff";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.class);
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=breakOnFirstErrorOff"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    // set BOFE to false
+                    + "&breakOnFirstError=false"
+                    + "&maxPollRecords=1"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    /**
+     * will continue to retry the message that is in error
+     */
+    @Test
+    public void kafkaBreakOnFirstErrorBasicCapability() throws Exception {
+        to.reset();
+        to.expectedMessageCount(4);
+
+        // message-3 and message-4 were never committed
+        // by the consumer route
+        // but we moved past them anyway
+        // because breakOnFirstError was false
+        // then when we encounter a successful message
+        // we commit that one and keep going
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-5");
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        Awaitility.await()
+                .atMost(3, TimeUnit.SECONDS)
+                .until(() -> to.getExchanges().size() > 3);
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .to(to)
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        });
+            }
+        };
+    }
+
+    private void publishMessagesToKafka() {
+        for (int i = 0; i < 6; i++) {
+            String msg = "message-" + i;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, msg);
+            producer.send(data);
+        }
+    }
+
+    private void doCommitOffset(Exchange exchange) {
+        LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, 
true));
+        KafkaManualCommit manual = exchange.getMessage()
+                .getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        assertNotNull(manual);
+        manual.commit();
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("message-3") || payload.equals("message-4")) {
+            throw new RuntimeException("ERROR TRIGGERED BY TEST");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
new file mode 100644
index 00000000000..e8ae0f732cc
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * this will test breakOnFirstError functionality and the issue that was 
surfaced in CAMEL-20044 regarding incorrectly
+ * handling the offset commit resulting in replaying messages
+ * 
+ * mimics the reproduction of the problem in 
https://github.com/CodeSmell/CamelKafkaOffset
+ */
+class KafkaBreakOnFirstErrorReplayOldMessagesIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    public static final String ROUTE_ID = "breakOnFirstError-20044";
+    public static final String TOPIC = "breakOnFirstError-20044";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class);
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=1"
+                    // here multiple threads was an issue
+                    + "&consumersCount=3"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeAll
+    public static void setupTopic() {
+        AdminClient kafkaAdminClient = createAdminClient(service);
+
+        // create the topic w/ 3 partitions
+        final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+        kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+    }
+
+    @BeforeEach
+    public void init() {
+
+        // setup the producer
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Test
+    void testCamel20044TestFix() throws Exception {
+        to.reset();
+        to.expectedMessageCount(13);
+        to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+                "6", "7", "ERROR", "8", "9", "10", "11");
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        // let test run for awhile
+        Awaitility.await()
+                .timeout(10, TimeUnit.SECONDS)
+                .pollDelay(8, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(true));
+
+        to.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                onException(RuntimeException.class)
+                        .handled(false)
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        })
+                        .end();
+
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .autoStartup(false)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        // capturing all of the payloads
+                        .to(to)
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        })
+                        .end();
+            }
+        };
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("ERROR")) {
+            throw new RuntimeException("NON RETRY ERROR TRIGGERED BY TEST");
+        }
+    }
+
+    private void publishMessagesToKafka() {
+        final List<String> producedRecords = List.of("1", "2", "3", "4", "5", 
"ERROR",
+                "6", "7", "ERROR", "8", "9", "10", "11");
+
+        producedRecords.forEach(v -> {
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, v);
+            producer.send(data);
+        });
+
+    }
+
+    private void doCommitOffset(Exchange exchange) {
+        LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, 
true));
+        KafkaManualCommit manual = exchange.getMessage()
+                .getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        if (Objects.nonNull(manual)) {
+            manual.commit();
+        } else {
+            LOG.error("KafkaManualCommit is MISSING");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
new file mode 100644
index 00000000000..b1ebc51a8a6
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * this will test breakOnFirstError functionality and the issue that was 
surfaced in CAMEL-19894 regarding failure to
+ * correctly commit the offset in a batch using the Synch Commit Manager
+ * 
+ * mimics the reproduction of the problem in 
https://github.com/Krivda/camel-bug-reproduction
+ */
+class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport {
+
+    public static final String ROUTE_ID = "breakOnFirstError-19894";
+    public static final String TOPIC = "breakOnFirstError-19894";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorSeekIssueIT.class);
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=8"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeAll
+    public static void setupTopic() {
+        AdminClient kafkaAdminClient = createAdminClient(service);
+
+        // create the topic w/ 2 partitions
+        final NewTopic mytopic = new NewTopic(TOPIC, 2, (short) 1);
+        kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+    }
+
+    @BeforeEach
+    public void init() {
+
+        // setup the producer
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Test
+    void testCamel19894TestFix() throws Exception {
+        to.reset();
+        // will consume the payloads from partition 0
+        // and will continually retry the payload with "5"
+        to.expectedMessageCount(4);
+        to.expectedBodiesReceived("1", "2", "3", "4");
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        // let test run for awhile
+        Awaitility.await()
+                .timeout(10, TimeUnit.SECONDS)
+                .pollDelay(8, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(true));
+
+        // the replaying of the message with an error
+        // will prevent other paylods from being 
+        // processed
+        to.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .autoStartup(false)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsFifthRecordThrowException(exchange);
+                        })
+                        .to(to)
+                        .end();
+            }
+        };
+    }
+
+    private void ifIsFifthRecordThrowException(Exchange e) {
+        if (e.getMessage().getBody().equals("5")) {
+            throw new RuntimeException("ERROR_TRIGGERED_BY_TEST");
+        }
+    }
+
+    private void publishMessagesToKafka() {
+        final List<String> producedRecordsPartition1 = List.of("5", "6", "7", 
"8", "9", "10", "11");
+        final List<String> producedRecordsPartition0 = List.of("1", "2", "3", 
"4");
+
+        producedRecordsPartition1.forEach(v -> {
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
1, null, null, v);
+            producer.send(data);
+        });
+
+        producedRecordsPartition0.forEach(v -> {
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
0, null, null, v);
+            producer.send(data);
+        });
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
new file mode 100644
index 00000000000..6ea0a467902
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * this will test basic breakOnFirstError functionality uses allowManualCommit 
and set Synch Commit Manager this allows
+ * Camel to handle when to commit an offset
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT extends 
BaseEmbeddedKafkaTestSupport {
+    public static final String ROUTE_ID = "breakOnFirstErrorBatchIT";
+    public static final String TOPIC = "breakOnFirstErrorBatchIT";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.class);
+
+    private final List<String> errorPayloads = new CopyOnWriteArrayList<>();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=3"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    // asynch commit factory
+                    + 
"&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    /**
+     * will continue to retry the message that is in error
+     */
+    @Test
+    public void kafkaBreakOnFirstErrorBasicCapability() throws Exception {
+        to.reset();
+        to.expectedMessageCount(3);
+        // message-3 causes an error 
+        // and breakOnFirstError will cause it to be retried forever
+        // we will never get to message-4
+        to.expectedBodiesReceived("message-0", "message-1", "message-2");
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        Awaitility.await()
+                .atMost(3, TimeUnit.SECONDS)
+                .until(() -> errorPayloads.size() > 3);
+
+        to.assertIsSatisfied();
+
+        for (String payload : errorPayloads) {
+            assertEquals("message-3", payload);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .to(to)
+                        .end();
+            }
+        };
+    }
+
+    private void publishMessagesToKafka() {
+        for (int i = 0; i < 5; i++) {
+            String msg = "message-" + i;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, msg);
+            producer.send(data);
+        }
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("message-3")) {
+            errorPayloads.add(payload);
+            throw new RuntimeException("ERROR TRIGGERED BY TEST");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
new file mode 100644
index 00000000000..61b2b894682
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * this will test basic breakOnFirstError functionality uses allowManualCommit 
and KafkaManualCommit because relying on
+ * Camel default to use NOOP Commit Manager this means the route 
implementation MUST manage all offset commits
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
+    public static final String ROUTE_ID = 
"breakOnFirstErrorBatchOnExceptionIT";
+    public static final String TOPIC = "breakOnFirstErrorBatchOnExceptionIT";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.class);
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=3"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    /**
+     * will continue to retry the message that is in error
+     */
+    @Test
+    public void kafkaBreakOnFirstErrorBasicCapability() throws Exception {
+        to.reset();
+        to.expectedMessageCount(7);
+
+        // old behavior before the fix in CAMEL-20044
+        // message-3 causes an error 
+        // and breakOnFirstError will cause it to be retried 1x
+        // then we move on
+        //to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-3", "message-4", "message-5");
+
+        // new behavior w/ NOOP Commit Manager
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4", "message-5");
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        Awaitility.await()
+                .atMost(3, TimeUnit.SECONDS)
+                .until(() -> to.getExchanges().size() > 5);
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                onException(Exception.class)
+                        .handled(false)
+                        // adding error message to end
+                        // so we can account for it
+                        .to(to)
+                        .process(exchange -> {
+                            // if we don't commit 
+                            // camel will continuously 
+                            // retry the message with an error
+                            doCommitOffset(exchange);
+                        });
+
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .to(to)
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        });
+            }
+        };
+    }
+
+    private void publishMessagesToKafka() {
+        for (int i = 0; i < 6; i++) {
+            String msg = "message-" + i;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, msg);
+            producer.send(data);
+        }
+    }
+
+    private void doCommitOffset(Exchange exchange) {
+        LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, 
true));
+        KafkaManualCommit manual = exchange.getMessage()
+                .getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        assertNotNull(manual);
+        manual.commit();
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("message-3")) {
+            throw new RuntimeException("ERROR TRIGGERED BY TEST");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
new file mode 100644
index 00000000000..39c1fa1c511
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * this will test basic breakOnFirstError functionality uses allowManualCommit 
and KafkaManualCommit because relying on
+ * Camel default to use NOOP Commit Manager this means the route 
implementation MUST manage all offset commits
+ * 
+ * will demonstrate how to retry
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    public static final String ROUTE_ID = "breakOnFirstErrorBatchRetryIT";
+    public static final String TOPIC = "breakOnFirstErrorBatchRetryIT";
+
+    private static final Logger LOG
+            = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.class);
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=3"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    /**
+     * will continue to retry the message that is in error
+     */
+    @Test
+    public void kafkaBreakOnFirstErrorBasicCapabilityRetry() throws Exception {
+        to.reset();
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        Awaitility.await()
+                .atMost(3, TimeUnit.SECONDS)
+                .until(() -> to.getExchanges().size() > 7);
+
+        assertFalse(to.getExchanges().stream()
+                .anyMatch(exc -> 
"message-4".equals(exc.getMessage().getBody(String.class))));
+
+        assertFalse(to.getExchanges().stream()
+                .anyMatch(exc -> 
"message-5".equals(exc.getMessage().getBody(String.class))));
+
+        assertTrue(to.getExchanges().stream()
+                .filter(exc -> 
"message-3".equals(exc.getMessage().getBody(String.class)))
+                .count() > 1);
+
+        to.assertIsSatisfied(3000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                onException(Exception.class)
+                        .handled(false)
+                        // adding error message to end
+                        // so we can account for it
+                        .to(to)
+                        // we are not 
+                        // going to commit offset
+                        // so will retry
+                        .end();
+
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .to(to)
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        });
+            }
+        };
+    }
+
+    private void publishMessagesToKafka() {
+        for (int i = 0; i < 6; i++) {
+            String msg = "message-" + i;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, msg);
+            producer.send(data);
+        }
+    }
+
+    private void doCommitOffset(Exchange exchange) {
+        LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, 
true));
+        KafkaManualCommit manual = exchange.getMessage()
+                .getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        assertNotNull(manual);
+        manual.commit();
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("message-3")) {
+            throw new RuntimeException("ERROR TRIGGERED BY TEST");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
new file mode 100644
index 00000000000..0fabfe01587
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * this will test basic breakOnFirstError functionality uses allowManualCommit 
and set Synch Commit Manager this allows
+ * Camel to handle when to commit an offset
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT extends 
BaseEmbeddedKafkaTestSupport {
+    public static final String ROUTE_ID = "breakOnFirstErrorBatchIT";
+    public static final String TOPIC = "breakOnFirstErrorBatchIT";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.class);
+
+    private final List<String> errorPayloads = new CopyOnWriteArrayList<>();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaBreakOnFirstErrorIT"
+                    + "&autoOffsetReset=earliest"
+                    + "&autoCommitEnable=false"
+                    + "&allowManualCommit=true"
+                    + "&breakOnFirstError=true"
+                    + "&maxPollRecords=3"
+                    + "&pollTimeoutMs=1000"
+                    + 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    + 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                    // synch commit factory
+                    + 
"&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory"
+                    + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+    private Endpoint from;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> 
producer;
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    /**
+     * will continue to retry the message that is in error
+     */
+    @Test
+    public void kafkaBreakOnFirstErrorBasicCapability() throws Exception {
+        to.reset();
+        to.expectedMessageCount(3);
+        // message-3 causes an error 
+        // and breakOnFirstError will cause it to be retried forever
+        // we will never get to message-4
+        to.expectedBodiesReceived("message-0", "message-1", "message-2");
+
+        context.getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        context.getRouteController().startRoute(ROUTE_ID);
+
+        Awaitility.await()
+                .atMost(3, TimeUnit.SECONDS)
+                .until(() -> errorPayloads.size() > 3);
+
+        to.assertIsSatisfied();
+
+        for (String payload : errorPayloads) {
+            assertEquals("message-3", payload);
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .routeId(ROUTE_ID)
+                        .process(exchange -> {
+                            
LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .to(to)
+                        .end();
+            }
+        };
+    }
+
+    private void publishMessagesToKafka() {
+        for (int i = 0; i < 5; i++) {
+            String msg = "message-" + i;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 
null, msg);
+            producer.send(data);
+        }
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("message-3")) {
+            errorPayloads.add(payload);
+            throw new RuntimeException("ERROR TRIGGERED BY TEST");
+        }
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java
new file mode 100644
index 00000000000..2acdb9746ab
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.testutil;
+
+import java.util.Objects;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
+
+public final class CamelKafkaUtil {
+
+    private CamelKafkaUtil() {
+    }
+
+    public static String buildKafkaLogMessage(String msg, Exchange exchange, 
boolean includeBody) {
+        String eol = "\n";
+
+        StringBuilder sb = new StringBuilder();
+        if (Objects.nonNull(msg)) {
+            sb.append(msg);
+            sb.append(eol);
+        }
+
+        sb.append("Message consumed from ");
+        sb.append(exchange.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class));
+        sb.append(eol);
+        sb.append("The Partition:Offset is ");
+        sb.append(exchange.getMessage().getHeader(KafkaConstants.PARTITION, 
String.class));
+        sb.append(":");
+        sb.append(exchange.getMessage().getHeader(KafkaConstants.OFFSET, 
String.class));
+        sb.append(eol);
+        sb.append("The Key is ");
+        sb.append(exchange.getMessage().getHeader(KafkaConstants.KEY, 
String.class));
+
+        if (includeBody) {
+            sb.append(eol);
+            sb.append(exchange.getMessage().getBody(String.class));
+        }
+
+        return sb.toString();
+    }
+
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc
index 4e6f65b78b8..4fc6dacf057 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc
@@ -4,6 +4,22 @@ This document is for helping you upgrade your Apache Camel 
application
 from Camel 3.x to 3.y. For example if you are upgrading Camel 3.0 to 3.2, then 
you should follow the guides
 from both 3.0 to 3.1 and 3.1 to 3.2.
 
+== Upgrading Camel 3.21 to 3.21.3
+
+=== camel-kafka
+
+The behavior for `breakOnFirstError` was altered as numerous issues were 
fixed. The behavior related to committing 
+the offset is now determined by the `CommitManager` that is configured. 
+
+When the default `CommitManager` is used (`NoopCommitManager`) then no commit 
is performed. The route implementation will
+be responsible for managing the offset using `KafkaManualCommit` to manage the 
retrying of the payload.
+
+When using the `SyncCommitManager` then the offset will be committed so that 
the payload is continually retried. This was
+the behavior described in the documentation.
+
+When using the `AsyncCommitManager` then the offset will be committed so that 
the payload is continually retried. This was
+the behavior described in the documentation.
+
 == Upgrading Camel 3.20 to 3.21
 
 === camel-core

Reply via email to