This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9bdbf7b6b394561058a7ef7198997244f43929d0
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Jan 19 11:40:26 2022 +0100

    CAMEL-15562: created a commit manager in camel-kafka
    
    This reorganizes the commit management code so that we can unify managing 
different commit strategies.
---
 .../component/kafka/KafkaComponentConfigurer.java  |   4 +-
 .../component/kafka/KafkaEndpointConfigurer.java   |   4 +-
 .../org/apache/camel/component/kafka/kafka.json    |   4 +-
 .../camel/component/kafka/KafkaComponent.java      |   2 +
 .../camel/component/kafka/KafkaConfiguration.java  |   1 +
 .../camel/component/kafka/KafkaEndpoint.java       |   2 +
 .../camel/component/kafka/KafkaFetchRecords.java   |  39 ++---
 ...kaAsyncManualCommit.java => CommitManager.java} |  27 ++-
 .../kafka/consumer/DefaultCommitManager.java       | 195 +++++++++++++++++++++
 .../consumer/DefaultKafkaManualAsyncCommit.java    |   3 +-
 .../DefaultKafkaManualAsyncCommitFactory.java      |   2 -
 .../kafka/consumer/DefaultKafkaManualCommit.java   |   1 -
 .../consumer/DefaultKafkaManualCommitFactory.java  |   2 -
 .../consumer/DefaultKafkaManualSyncCommit.java     |   4 +-
 .../kafka/consumer/KafkaAsyncManualCommit.java     |   2 -
 .../kafka/{ => consumer}/KafkaManualCommit.java    |   2 +-
 .../{ => consumer}/KafkaManualCommitFactory.java   |   3 +-
 .../consumer/support/KafkaRecordProcessor.java     | 131 +-------------
 .../support/KafkaRecordProcessorFacade.java        |  20 +--
 .../support/PartitionAssignmentListener.java       |  12 +-
 .../kafka/consumer/support/ProcessingResult.java   |   4 +-
 .../KafkaConsumerAsyncManualCommitIT.java          |   6 +-
 .../integration/KafkaConsumerManualCommitIT.java   |   2 +-
 .../ROOT/pages/camel-3x-upgrade-guide-3_15.adoc    |   2 +
 .../dsl/KafkaComponentBuilderFactory.java          |   9 +-
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  13 +-
 26 files changed, 285 insertions(+), 211 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index d8f6a58..0d5a1c2 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -91,7 +91,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "kafkaclientfactory":
         case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
-        case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); 
return true;
+        case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class, 
value)); return true;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": 
getOrCreateConfiguration(target).setKerberosBeforeReloginMinTime(property(camelContext,
 java.lang.Integer.class, value)); return true;
         case "kerberosinitcmd":
@@ -305,7 +305,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "kafkaclientfactory":
         case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
-        case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
+        case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": return java.lang.Integer.class;
         case "kerberosinitcmd":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 7c3ef61..270e758 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -85,7 +85,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "kafkaclientfactory":
         case "kafkaClientFactory": 
target.setKafkaClientFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
         case "kafkamanualcommitfactory":
-        case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); 
return true;
+        case "kafkaManualCommitFactory": 
target.setKafkaManualCommitFactory(property(camelContext, 
org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class, 
value)); return true;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": 
target.getConfiguration().setKerberosBeforeReloginMinTime(property(camelContext,
 java.lang.Integer.class, value)); return true;
         case "kerberosinitcmd":
@@ -291,7 +291,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "kafkaclientfactory":
         case "kafkaClientFactory": return 
org.apache.camel.component.kafka.KafkaClientFactory.class;
         case "kafkamanualcommitfactory":
-        case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
+        case "kafkaManualCommitFactory": return 
org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
         case "kerberosBeforeReloginMinTime": return java.lang.Integer.class;
         case "kerberosinitcmd":
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 4a3781f..ddaeb8b 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -61,7 +61,7 @@
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether the topic is a pattern (regular 
expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value th [...]
-    "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": 
false, "autowired": true, "secret": false, "description": "Factory to use for 
creating KafkaManualCommit instances. This allows to plugin a custom factory to 
create custom KafkaManualCommit instances in cas [...]
+    "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": true, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit instanc [...]
     "pollExceptionStrategy": { "kind": "property", "displayName": "Poll 
Exception Strategy", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, 
"autowired": true, "secret": false, "description": "To use a custom strategy 
with the consumer to control how to handle exceptions thrown from the Kafka 
broker while pooling messages." },
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify the [...]
@@ -169,7 +169,7 @@
     "valueDeserializer": { "kind": "parameter", "displayName": "Value 
Deserializer", "group": "consumer", "label": "consumer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringDeserializer", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Deserializer class for 
value t [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception 
Handler", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By default the 
con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange 
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", 
"InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the exchange pattern when the consumer creates an 
exchange." },
-    "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": 
false, "autowired": false, "secret": false, "description": "Factory to use for 
creating KafkaManualCommit instances. This allows to plugin a custom factory to 
create custom KafkaManualCommit instances in c [...]
+    "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka 
Manual Commit Factory", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Factory to use for creating KafkaManualCommit instances. This allows to plugin 
a custom factory to create custom KafkaManualCommit insta [...]
     "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory 
Size", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "33554432", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The total bytes of memory the producer can use 
to buffer records waiting to b [...]
     "compressionCodec": { "kind": "parameter", "displayName": "Compression 
Codec", "group": "producer", "label": "producer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", 
"lz4" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "none", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This parameter allows you to specify th [...]
     "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection 
Max Idle Ms", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "540000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Close idle connections 
after the number of milliseconds specified by thi [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index abe2a0c..dde3327 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -21,6 +21,8 @@ import java.util.Map;
 import org.apache.camel.CamelContext;
 import org.apache.camel.SSLContextParametersAware;
 import 
org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 1e5d833..74a1cb1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 39efe6d..1821084 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -24,6 +24,8 @@ import org.apache.camel.Consumer;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5b529df..fe51691 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -22,12 +22,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
-import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit;
+import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.DefaultCommitManager;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
 import 
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
@@ -53,7 +53,7 @@ class KafkaFetchRecords implements Runnable {
     private final PollExceptionStrategy pollExceptionStrategy;
     private final BridgeExceptionHandlerToErrorHandler bridge;
     private final ReentrantLock lock = new ReentrantLock();
-    private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = 
new ConcurrentLinkedQueue<>();
+    private CommitManager commitManager;
 
     private boolean retry = true;
     private boolean reconnect; // must be false at init (this is the policy 
whether to reconnect)
@@ -84,6 +84,8 @@ class KafkaFetchRecords implements Runnable {
 
                     initializeConsumer();
                     setConnected(true);
+
+                    commitManager = new DefaultCommitManager(consumer, 
kafkaConsumer, threadId, getPrintableTopic());
                 }
             } catch (Exception e) {
                 setConnected(false);
@@ -136,7 +138,7 @@ class KafkaFetchRecords implements Runnable {
     private void subscribe() {
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
                 threadId, kafkaConsumer.getEndpoint().getConfiguration(), 
consumer, lastProcessedOffset,
-                this::isRunnable);
+                this::isRunnable, commitManager);
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
@@ -166,13 +168,13 @@ class KafkaFetchRecords implements Runnable {
             }
 
             KafkaRecordProcessorFacade recordProcessorFacade = new 
KafkaRecordProcessorFacade(
-                    kafkaConsumer, lastProcessedOffset, threadId, consumer, 
asyncCommits);
+                    kafkaConsumer, lastProcessedOffset, threadId, 
commitManager);
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
             while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) 
{
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
 
-                processAsyncCommits();
+                commitManager.processAsyncCommits();
 
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
 
@@ -188,14 +190,14 @@ class KafkaFetchRecords implements Runnable {
 
             if (!isConnected()) {
                 LOG.debug("Not reconnecting, check whether to auto-commit or 
not ...");
-                commit();
+                commitManager.commit();
             }
 
             safeUnsubscribe();
         } catch (InterruptException e) {
             kafkaConsumer.getExceptionHandler().handleException("Interrupted 
while consuming " + threadId + " from kafka topic",
                     e);
-            commit();
+            commitManager.commit();
 
             LOG.info("Unsubscribing {} from {}", threadId, 
getPrintableTopic());
             safeUnsubscribe();
@@ -230,12 +232,6 @@ class KafkaFetchRecords implements Runnable {
         }
     }
 
-    private void processAsyncCommits() {
-        while (!asyncCommits.isEmpty()) {
-            asyncCommits.poll().processAsyncCommit();
-        }
-    }
-
     private void handleAccordingToStrategy(long partitionLastOffset, Exception 
e) {
         PollOnError onError = pollExceptionStrategy.handleException(e);
         if (PollOnError.RETRY == onError) {
@@ -276,21 +272,6 @@ class KafkaFetchRecords implements Runnable {
         }
     }
 
-    private void commit() {
-        processAsyncCommits();
-        if 
(kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) {
-            if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitAsync on stop {} from {}", threadId, 
getPrintableTopic());
-                consumer.commitAsync();
-            } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitSync on stop {} from {}", threadId, 
getPrintableTopic());
-                consumer.commitSync();
-            } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commit on stop {} from {} is disabled (none)", 
threadId, getPrintableTopic());
-            }
-        }
-    }
-
     private void handlePollStop() {
         // stop and terminate consumer
         LOG.warn("Requesting the consumer to stop based on polling exception 
strategy");
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
similarity index 58%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
index 3984998..d9fdc0a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
@@ -14,16 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.camel.component.kafka.consumer;
 
-import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.Exchange;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 
-/**
- * Can be used for forcing async manual offset commit when using Kafka 
consumer.
- */
-public interface KafkaAsyncManualCommit extends KafkaManualCommit {
-    /**
-     * Used in the consumer loop to effectively call 
org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync()
-     */
-    void processAsyncCommit();
+public interface CommitManager {
+
+    KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition 
partition, ConsumerRecord<Object, Object> record);
+
+    void commitOffset(TopicPartition partition, long partitionLastOffset);
+
+    void commitOffsetForce(TopicPartition partition, long partitionLastOffset);
+
+    void commitOffsetOnStop(TopicPartition partition, long 
partitionLastOffset);
+
+    @Deprecated
+    void processAsyncCommits();
+
+    void commit();
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java
new file mode 100644
index 0000000..76ae5ea
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.spi.StateRepository;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultCommitManager implements CommitManager {
+    public static final long START_OFFSET = -1;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultCommitManager.class);
+    private final Consumer<?, ?> consumer;
+    private final KafkaConsumer kafkaConsumer;
+    private final String threadId;
+    private final String printableTopic;
+    private final KafkaConfiguration configuration;
+
+    private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = 
new ConcurrentLinkedQueue<>();
+
+    public DefaultCommitManager(Consumer<?, ?> consumer, KafkaConsumer 
kafkaConsumer, String threadId, String printableTopic) {
+        this.consumer = consumer;
+        this.kafkaConsumer = kafkaConsumer;
+        this.threadId = threadId;
+        this.printableTopic = printableTopic;
+        this.configuration = kafkaConsumer.getEndpoint().getConfiguration();
+    }
+
+    public void processAsyncCommits() {
+        while (!asyncCommits.isEmpty()) {
+            asyncCommits.poll().processAsyncCommit();
+        }
+    }
+
+    @Override
+    public KafkaManualCommit getManualCommit(
+            Exchange exchange, TopicPartition partition, 
ConsumerRecord<Object, Object> record) {
+        KafkaManualCommitFactory manualCommitFactory = 
kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+        long commitTimeoutMs = configuration.getCommitTimeoutMs();
+
+        return manualCommitFactory.newInstance(exchange, consumer, 
partition.topic(), threadId,
+                offsetRepository, partition, record.offset(), commitTimeoutMs, 
asyncCommits);
+    }
+
+    @Override
+    public void commit() {
+        processAsyncCommits();
+        if 
(kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) {
+            if ("async".equals(configuration.getAutoCommitOnStop())) {
+                LOG.info("Auto commitAsync on stop {} from {}", threadId, 
printableTopic);
+                consumer.commitAsync();
+            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
+                LOG.info("Auto commitSync on stop {} from {}", threadId, 
printableTopic);
+                consumer.commitSync();
+            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
+                LOG.info("Auto commit on stop {} from {} is disabled (none)", 
threadId, printableTopic);
+            }
+        }
+    }
+
+    @Override
+    public void commitOffset(TopicPartition partition, long 
partitionLastOffset) {
+        if (partitionLastOffset == START_OFFSET) {
+            return;
+        }
+
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+
+        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
+            saveStateToOffsetRepository(partition, partitionLastOffset, 
offsetRepository);
+        }
+    }
+
+    @Override
+    public void commitOffsetOnStop(TopicPartition partition, long 
partitionLastOffset) {
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+
+        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
+            saveStateToOffsetRepository(partition, partitionLastOffset, 
offsetRepository);
+        } else {
+            // if we are stopping then react according to the configured option
+            if ("async".equals(configuration.getAutoCommitOnStop())) {
+                commitAsync(consumer, partition, partitionLastOffset);
+            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
+                commitSync(configuration, consumer, partition, 
partitionLastOffset);
+
+            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
+                noCommit(partition);
+            }
+        }
+    }
+
+    @Override
+    public void commitOffsetForce(TopicPartition partition, long 
partitionLastOffset) {
+        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
+
+        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
+            saveStateToOffsetRepository(partition, partitionLastOffset, 
offsetRepository);
+        } else {
+            forceSyncCommit(configuration, consumer, partition, 
partitionLastOffset);
+        }
+    }
+
+    private void commitOffset(
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition,
+            long partitionLastOffset) {
+        long timeout = configuration.getCommitTimeoutMs();
+        consumer.commitSync(
+                Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)),
+                Duration.ofMillis(timeout));
+    }
+
+    private void forceSyncCommit(
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: 
{}]", threadId, partition.topic(),
+                    partition.partition(), partitionLastOffset);
+        }
+
+        commitOffset(configuration, consumer, partition, partitionLastOffset);
+    }
+
+    private void noCommit(TopicPartition partition) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, partition.topic());
+        }
+    }
+
+    private void commitSync(
+            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset) {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Auto commitSync on stop {} from topic {}", threadId, 
partition.topic());
+        }
+
+        commitOffset(configuration, consumer, partition, partitionLastOffset);
+    }
+
+    private void commitAsync(Consumer<?, ?> consumer, TopicPartition 
partition, long partitionLastOffset) {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, 
partition.topic());
+        }
+
+        consumer.commitAsync(
+                Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)), null);
+    }
+
+    private void saveStateToOffsetRepository(
+            TopicPartition partition, long partitionLastOffset,
+            StateRepository<String, String> offsetRepository) {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Saving offset repository state {} [topic: {} partition: 
{} offset: {}]", threadId, partition.topic(),
+                    partition.partition(),
+                    partitionLastOffset);
+        }
+        offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
+    }
+
+    private static String serializeOffsetKey(TopicPartition topicPartition) {
+        return topicPartition.topic() + '/' + topicPartition.partition();
+    }
+
+    private static String serializeOffsetValue(long offset) {
+        return String.valueOf(offset);
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index af33b0f..c976a54 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer;
 import java.util.Collection;
 import java.util.Collections;
 
-import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -55,7 +54,7 @@ public class DefaultKafkaManualAsyncCommit extends 
DefaultKafkaManualCommit impl
 
     protected void commitAsyncOffset(
             StateRepository<String, String> offsetRepository, TopicPartition 
partition, long recordOffset) {
-        if (recordOffset != KafkaRecordProcessor.START_OFFSET) {
+        if (recordOffset != DefaultCommitManager.START_OFFSET) {
             if (offsetRepository != null) {
                 offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(recordOffset));
             } else {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
index cc8442a..792c798 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java
@@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer;
 import java.util.Collection;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaManualCommit;
-import org.apache.camel.component.kafka.KafkaManualCommitFactory;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
index dd92ee0..95da58c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import org.apache.camel.component.kafka.KafkaManualCommit;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
index c8670ff..4b8041e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java
@@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer;
 import java.util.Collection;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaManualCommit;
-import org.apache.camel.component.kafka.KafkaManualCommitFactory;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
index 5c86240..cefb68a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java
@@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer;
 import java.time.Duration;
 import java.util.Collections;
 
-import org.apache.camel.component.kafka.KafkaManualCommit;
-import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -46,7 +44,7 @@ public class DefaultKafkaManualSyncCommit extends 
DefaultKafkaManualCommit imple
     }
 
     protected void commitOffset(StateRepository<String, String> 
offsetRepository, TopicPartition partition, long recordOffset) {
-        if (recordOffset != KafkaRecordProcessor.START_OFFSET) {
+        if (recordOffset != DefaultCommitManager.START_OFFSET) {
             if (offsetRepository != null) {
                 offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(recordOffset));
             } else {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
index 3984998..eff9eea 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.kafka.consumer;
 
-import org.apache.camel.component.kafka.KafkaManualCommit;
-
 /**
  * Can be used for forcing async manual offset commit when using Kafka 
consumer.
  */
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java
similarity index 95%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java
index 5ed94f8..b85c073 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka;
+package org.apache.camel.component.kafka.consumer;
 
 /**
  * Can be used for forcing manual offset commit when using Kafka consumer.
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
similarity index 92%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
index daa1213..10d8893 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka;
+package org.apache.camel.component.kafka.consumer;
 
 import java.util.Collection;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index b83fe4d..df288d7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -16,55 +16,38 @@
  */
 package org.apache.camel.component.kafka.consumer.support;
 
-import java.time.Duration;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.StreamSupport;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.KafkaManualCommit;
-import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaRecordProcessor {
-    public static final long START_OFFSET = -1;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordProcessor.class);
 
     private final boolean autoCommitEnabled;
     private final KafkaConfiguration configuration;
     private final Processor processor;
-    private final Consumer<?, ?> consumer;
-    private final KafkaManualCommitFactory manualCommitFactory;
-    private final String threadId;
-    private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits;
+    private final CommitManager commitManager;
 
-    public KafkaRecordProcessor(KafkaConfiguration configuration,
-                                Processor processor, Consumer<?, ?> consumer,
-                                KafkaManualCommitFactory manualCommitFactory,
-                                String threadId, 
ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
+    public KafkaRecordProcessor(KafkaConfiguration configuration, Processor 
processor, CommitManager commitManager) {
         this.autoCommitEnabled = configuration.isAutoCommitEnable();
         this.configuration = configuration;
         this.processor = processor;
-        this.consumer = consumer;
-        this.manualCommitFactory = manualCommitFactory;
-        this.threadId = threadId;
-        this.asyncCommits = asyncCommits;
+        this.commitManager = commitManager;
     }
 
     private void setupExchangeMessage(Message message, ConsumerRecord record) {
@@ -115,11 +98,9 @@ public class KafkaRecordProcessor {
         }
 
         if (configuration.isAllowManualCommit()) {
-            StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
-
             // allow Camel users to access the Kafka consumer API to be able 
to do for example manual commits
-            KafkaManualCommit manual = 
manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId,
-                    offsetRepository, partition, record.offset(), 
configuration.getCommitTimeoutMs(), asyncCommits);
+            KafkaManualCommit manual = commitManager.getManualCommit(exchange, 
partition, record);
+
             message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
             message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext 
&& !partitionHasNext);
         }
@@ -151,7 +132,7 @@ public class KafkaRecordProcessor {
             }
 
             // force commit, so we resume on next poll where we failed
-            commitOffset(partition, partitionLastOffset, false, true);
+            commitManager.commitOffsetForce(partition, partitionLastOffset);
 
             // continue to next partition
             return true;
@@ -163,106 +144,10 @@ public class KafkaRecordProcessor {
         return false;
     }
 
-    public void commitOffset(
-            TopicPartition partition, long partitionLastOffset, boolean 
stopping, boolean forceCommit) {
-        commitOffset(configuration, consumer, partition, partitionLastOffset, 
stopping, forceCommit, threadId);
-    }
-
-    public static void commitOffset(
-            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset,
-            boolean stopping, boolean forceCommit, String threadId) {
-
-        if (partitionLastOffset == START_OFFSET) {
-            return;
-        }
-
-        StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
-
-        if (!configuration.isAllowManualCommit() && offsetRepository != null) {
-            saveStateToOffsetRepository(partition, partitionLastOffset, 
threadId, offsetRepository);
-        } else if (stopping) {
-            // if we are stopping then react according to the configured option
-            if ("async".equals(configuration.getAutoCommitOnStop())) {
-                commitAsync(consumer, partition, partitionLastOffset, 
threadId);
-            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
-                commitSync(configuration, consumer, partition, 
partitionLastOffset, threadId);
-
-            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
-                noCommit(partition, threadId);
-            }
-        } else if (forceCommit) {
-            forceSyncCommit(configuration, consumer, partition, 
partitionLastOffset, threadId);
-        }
-    }
-
-    private static void commitOffset(
-            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition,
-            long partitionLastOffset) {
-        long timeout = configuration.getCommitTimeoutMs();
-        consumer.commitSync(
-                Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)),
-                Duration.ofMillis(timeout));
-    }
-
-    private static void forceSyncCommit(
-            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset,
-            String threadId) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: 
{}]", threadId, partition.topic(),
-                    partition.partition(), partitionLastOffset);
-        }
-
-        commitOffset(configuration, consumer, partition, partitionLastOffset);
-    }
-
-    private static void noCommit(TopicPartition partition, String threadId) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, partition.topic());
-        }
-    }
-
-    private static void commitSync(
-            KafkaConfiguration configuration, Consumer<?, ?> consumer, 
TopicPartition partition, long partitionLastOffset,
-            String threadId) {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commitSync on stop {} from topic {}", threadId, 
partition.topic());
-        }
-
-        commitOffset(configuration, consumer, partition, partitionLastOffset);
-    }
-
-    private static void commitAsync(
-            Consumer<?, ?> consumer, TopicPartition partition, long 
partitionLastOffset, String threadId) {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, 
partition.topic());
-        }
-
-        consumer.commitAsync(
-                Collections.singletonMap(partition, new 
OffsetAndMetadata(partitionLastOffset + 1)), null);
-    }
-
-    private static void saveStateToOffsetRepository(
-            TopicPartition partition, long partitionLastOffset, String 
threadId,
-            StateRepository<String, String> offsetRepository) {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Saving offset repository state {} [topic: {} partition: 
{} offset: {}]", threadId, partition.topic(),
-                    partition.partition(),
-                    partitionLastOffset);
-        }
-        offsetRepository.setState(serializeOffsetKey(partition), 
serializeOffsetValue(partitionLastOffset));
-    }
-
     public static String serializeOffsetKey(TopicPartition topicPartition) {
         return topicPartition.topic() + '/' + topicPartition.partition();
     }
 
-    public static String serializeOffsetValue(long offset) {
-        return String.valueOf(offset);
-    }
-
     public static long deserializeOffsetValue(String offset) {
         return Long.parseLong(offset);
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 9d77581..10d893e 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -21,11 +21,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit;
 import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -41,15 +40,17 @@ public class KafkaRecordProcessorFacade {
     private final Map<String, Long> lastProcessedOffset;
     private final String threadId;
     private final KafkaRecordProcessor kafkaRecordProcessor;
+    private final CommitManager commitManager;
 
     public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, 
Map<String, Long> lastProcessedOffset, String threadId,
-                                      
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
-                                      
ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
+                                      CommitManager commitManager) {
         this.camelKafkaConsumer = camelKafkaConsumer;
         this.lastProcessedOffset = lastProcessedOffset;
         this.threadId = threadId;
+        this.commitManager = commitManager;
+
+        kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
 
-        kafkaRecordProcessor = buildKafkaRecordProcessor(consumer, 
asyncCommits);
     }
 
     private boolean isStopping() {
@@ -83,7 +84,7 @@ public class KafkaRecordProcessorFacade {
             if (!lastResult.isBreakOnErrorHit()) {
                 LOG.debug("Committing offset on successful execution");
                 // all records processed from partition so commit them
-                kafkaRecordProcessor.commitOffset(partition, 
lastResult.getPartitionLastOffset(), false, false);
+                commitManager.commitOffset(partition, 
lastResult.getPartitionLastOffset());
             }
         }
 
@@ -136,13 +137,10 @@ public class KafkaRecordProcessorFacade {
         }
     }
 
-    private KafkaRecordProcessor buildKafkaRecordProcessor(
-            org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
-            ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
+    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager 
commitManager) {
         return new KafkaRecordProcessor(
                 camelKafkaConsumer.getEndpoint().getConfiguration(),
                 camelKafkaConsumer.getProcessor(),
-                consumer,
-                
camelKafkaConsumer.getEndpoint().getKafkaManualCommitFactory(), threadId, 
asyncCommits);
+                commitManager);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 51854b1..b034293 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
@@ -38,15 +39,17 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
+    private final CommitManager commitManager;
     private Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration 
configuration,
                                        Consumer consumer, Map<String, Long> 
lastProcessedOffset,
-                                       Supplier<Boolean> stopStateSupplier) {
+                                       Supplier<Boolean> stopStateSupplier, 
CommitManager commitManager) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
+        this.commitManager = commitManager;
         this.stopStateSupplier = stopStateSupplier;
 
         this.resumeStrategy = 
ResumeStrategyFactory.newResumeStrategy(configuration);
@@ -69,7 +72,12 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
             try {
                 // only commit offsets if the component has control
                 if (configuration.getAutoCommitEnable()) {
-                    KafkaRecordProcessor.commitOffset(configuration, consumer, 
partition, offset, stopping, false, threadId);
+                    if (stopping) {
+                        commitManager.commitOffsetOnStop(partition, offset);
+                    } else {
+                        commitManager.commitOffset(partition, offset);
+                    }
+
                 }
             } catch (Exception e) {
                 LOG.error("Error saving offset repository state {} from 
offsetKey {} with offset: {}", threadId, offsetKey,
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index a413a4a..79934d0 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,8 +17,10 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
+import org.apache.camel.component.kafka.consumer.DefaultCommitManager;
+
 public final class ProcessingResult {
-    private static final ProcessingResult UNPROCESSED_RESULT = new 
ProcessingResult(false, KafkaRecordProcessor.START_OFFSET);
+    private static final ProcessingResult UNPROCESSED_RESULT = new 
ProcessingResult(false, DefaultCommitManager.START_OFFSET);
 
     private final boolean breakOnErrorHit;
     private final long partitionLastOffset;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 960b198..6f2a642 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -25,10 +25,10 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
-import 
org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.KafkaManualCommit;
-import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import 
org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
index e430ea3..5334aa5 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
@@ -23,7 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc
index 6632c5e..c423441 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc
@@ -181,6 +181,8 @@ The following classes were moved from 
`org.apache.camel.component.kafka` to `org
 * DefaultKafkaManualSyncCommit
 * DefaultKafkaManualSyncCommitFactory
 * KafkaAsyncManualCommit
+* KafkaManualCommit
+* KafkaManualCommitFactory
 
 === camel-karaf
 
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 2ce40d2..b608f8a 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -21,6 +21,7 @@ import org.apache.camel.Component;
 import org.apache.camel.builder.component.AbstractComponentBuilder;
 import org.apache.camel.builder.component.ComponentBuilder;
 import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 
 /**
  * Sent and receive messages to/from an Apache Kafka broker.
@@ -789,7 +790,7 @@ public interface KafkaComponentBuilderFactory {
          * box.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.kafka.KafkaManualCommitFactory&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory&lt;/code&gt;
 type.
          * 
          * Group: consumer (advanced)
          * 
@@ -797,7 +798,7 @@ public interface KafkaComponentBuilderFactory {
          * @return the dsl builder
          */
         default KafkaComponentBuilder kafkaManualCommitFactory(
-                org.apache.camel.component.kafka.KafkaManualCommitFactory 
kafkaManualCommitFactory) {
+                KafkaManualCommitFactory kafkaManualCommitFactory) {
             doSetProperty("kafkaManualCommitFactory", 
kafkaManualCommitFactory);
             return this;
         }
@@ -2046,7 +2047,7 @@ public interface KafkaComponentBuilderFactory {
             case "specificAvroReader": 
getOrCreateConfiguration((KafkaComponent) 
component).setSpecificAvroReader((boolean) value); return true;
             case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) 
component).setTopicIsPattern((boolean) value); return true;
             case "valueDeserializer": 
getOrCreateConfiguration((KafkaComponent) 
component).setValueDeserializer((java.lang.String) value); return true;
-            case "kafkaManualCommitFactory": ((KafkaComponent) 
component).setKafkaManualCommitFactory((org.apache.camel.component.kafka.KafkaManualCommitFactory)
 value); return true;
+            case "kafkaManualCommitFactory": ((KafkaComponent) 
component).setKafkaManualCommitFactory((KafkaManualCommitFactory) value); 
return true;
             case "pollExceptionStrategy": ((KafkaComponent) 
component).setPollExceptionStrategy((org.apache.camel.component.kafka.PollExceptionStrategy)
 value); return true;
             case "bufferMemorySize": getOrCreateConfiguration((KafkaComponent) 
component).setBufferMemorySize((java.lang.Integer) value); return true;
             case "compressionCodec": getOrCreateConfiguration((KafkaComponent) 
component).setCompressionCodec((java.lang.String) value); return true;
@@ -2115,4 +2116,4 @@ public interface KafkaComponentBuilderFactory {
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index b1ffc6a..984cd5a 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -16,15 +16,14 @@
  */
 package org.apache.camel.builder.endpoint.dsl;
 
-import java.util.*;
 import java.util.Map;
 import java.util.concurrent.*;
-import java.util.function.*;
-import java.util.stream.*;
+
 import javax.annotation.Generated;
 import org.apache.camel.builder.EndpointConsumerBuilder;
 import org.apache.camel.builder.EndpointProducerBuilder;
 import org.apache.camel.builder.endpoint.AbstractEndpointBuilder;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 
 /**
  * Sent and receive messages to/from an Apache Kafka broker.
@@ -1858,7 +1857,7 @@ public interface KafkaEndpointBuilderFactory {
          * box.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.kafka.KafkaManualCommitFactory&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory&lt;/code&gt;
 type.
          * 
          * Group: consumer (advanced)
          * 
@@ -1866,7 +1865,7 @@ public interface KafkaEndpointBuilderFactory {
          * @return the dsl builder
          */
         default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory(
-                org.apache.camel.component.kafka.KafkaManualCommitFactory 
kafkaManualCommitFactory) {
+                KafkaManualCommitFactory kafkaManualCommitFactory) {
             doSetProperty("kafkaManualCommitFactory", 
kafkaManualCommitFactory);
             return this;
         }
@@ -1878,7 +1877,7 @@ public interface KafkaEndpointBuilderFactory {
          * box.
          * 
          * The option will be converted to a
-         * 
&lt;code&gt;org.apache.camel.component.kafka.KafkaManualCommitFactory&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory&lt;/code&gt;
 type.
          * 
          * Group: consumer (advanced)
          * 
@@ -4728,4 +4727,4 @@ public interface KafkaEndpointBuilderFactory {
         }
         return new KafkaEndpointBuilderImpl(path);
     }
-}
\ No newline at end of file
+}

Reply via email to