This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f9a9f749fea55c244745da6d4f7e191f47299a85
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Mon Mar 28 12:27:52 2022 +0200

    CAMEL-17802: remove the autoCommitOnStop because the consumer already 
auto-commits on the call to close
---
 .../component/kafka/KafkaComponentConfigurer.java  |   6 -
 .../component/kafka/KafkaEndpointConfigurer.java   |   6 -
 .../component/kafka/KafkaEndpointUriFactory.java   | 177 ++++++++++-----------
 .../org/apache/camel/component/kafka/kafka.json    |   2 -
 .../camel/component/kafka/KafkaConfiguration.java  |  15 --
 .../camel/component/kafka/KafkaFetchRecords.java   |   2 +
 .../component/kafka/consumer/CommitManagers.java   |   8 +-
 7 files changed, 91 insertions(+), 125 deletions(-)

diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 8138ecd..0ac2077 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -36,8 +36,6 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoCommitEnable": 
getOrCreateConfiguration(target).setAutoCommitEnable(property(camelContext, 
boolean.class, value)); return true;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": 
getOrCreateConfiguration(target).setAutoCommitIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
-        case "autocommitonstop":
-        case "autoCommitOnStop": 
getOrCreateConfiguration(target).setAutoCommitOnStop(property(camelContext, 
java.lang.String.class, value)); return true;
         case "autooffsetreset":
         case "autoOffsetReset": 
getOrCreateConfiguration(target).setAutoOffsetReset(property(camelContext, 
java.lang.String.class, value)); return true;
         case "autowiredenabled":
@@ -256,8 +254,6 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoCommitEnable": return boolean.class;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return java.lang.Integer.class;
-        case "autocommitonstop":
-        case "autoCommitOnStop": return java.lang.String.class;
         case "autooffsetreset":
         case "autoOffsetReset": return java.lang.String.class;
         case "autowiredenabled":
@@ -472,8 +468,6 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "autoCommitEnable": return 
getOrCreateConfiguration(target).isAutoCommitEnable();
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return 
getOrCreateConfiguration(target).getAutoCommitIntervalMs();
-        case "autocommitonstop":
-        case "autoCommitOnStop": return 
getOrCreateConfiguration(target).getAutoCommitOnStop();
         case "autooffsetreset":
         case "autoOffsetReset": return 
getOrCreateConfiguration(target).getAutoOffsetReset();
         case "autowiredenabled":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 2ee0b05..b36f1d0 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -29,8 +29,6 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitEnable": 
target.getConfiguration().setAutoCommitEnable(property(camelContext, 
boolean.class, value)); return true;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": 
target.getConfiguration().setAutoCommitIntervalMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
-        case "autocommitonstop":
-        case "autoCommitOnStop": 
target.getConfiguration().setAutoCommitOnStop(property(camelContext, 
java.lang.String.class, value)); return true;
         case "autooffsetreset":
         case "autoOffsetReset": 
target.getConfiguration().setAutoOffsetReset(property(camelContext, 
java.lang.String.class, value)); return true;
         case "breakonfirsterror":
@@ -233,8 +231,6 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitEnable": return boolean.class;
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return java.lang.Integer.class;
-        case "autocommitonstop":
-        case "autoCommitOnStop": return java.lang.String.class;
         case "autooffsetreset":
         case "autoOffsetReset": return java.lang.String.class;
         case "breakonfirsterror":
@@ -438,8 +434,6 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "autoCommitEnable": return 
target.getConfiguration().isAutoCommitEnable();
         case "autocommitintervalms":
         case "autoCommitIntervalMs": return 
target.getConfiguration().getAutoCommitIntervalMs();
-        case "autocommitonstop":
-        case "autoCommitOnStop": return 
target.getConfiguration().getAutoCommitOnStop();
         case "autooffsetreset":
         case "autoOffsetReset": return 
target.getConfiguration().getAutoOffsetReset();
         case "breakonfirsterror":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 667b5e3..0b40860 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,115 +21,114 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(102);
-        props.add("additionalProperties");
+        Set<String> props = new HashSet<>(101);
+        props.add("synchronous");
+        props.add("queueBufferingMaxMessages");
         props.add("allowManualCommit");
-        props.add("autoCommitEnable");
-        props.add("autoCommitIntervalMs");
-        props.add("autoCommitOnStop");
-        props.add("autoOffsetReset");
+        props.add("consumersCount");
+        props.add("receiveBufferBytes");
+        props.add("reconnectBackoffMaxMs");
+        props.add("valueDeserializer");
+        props.add("metricReporters");
+        props.add("sslTruststoreType");
+        props.add("sendBufferBytes");
+        props.add("heartbeatIntervalMs");
+        props.add("interceptorClasses");
+        props.add("sslKeystoreType");
         props.add("breakOnFirstError");
-        props.add("bridgeErrorHandler");
+        props.add("requestRequiredAcks");
+        props.add("enableIdempotence");
+        props.add("pollOnError");
+        props.add("fetchWaitMaxMs");
+        props.add("retries");
+        props.add("maxPollRecords");
+        props.add("additionalProperties");
+        props.add("keyDeserializer");
+        props.add("producerBatchSize");
+        props.add("retryBackoffMs");
         props.add("brokers");
-        props.add("bufferMemorySize");
-        props.add("checkCrcs");
+        props.add("metricsSampleWindowMs");
+        props.add("sslContextParameters");
+        props.add("sslKeyPassword");
+        props.add("keySerializer");
+        props.add("noOfMetricsSample");
+        props.add("maxPartitionFetchBytes");
+        props.add("partitionKey");
+        props.add("headerFilterStrategy");
+        props.add("sslTruststorePassword");
+        props.add("sessionTimeoutMs");
+        props.add("key");
+        props.add("topicIsPattern");
+        props.add("sslTruststoreLocation");
         props.add("clientId");
-        props.add("commitTimeoutMs");
+        props.add("maxRequestSize");
+        props.add("recordMetadata");
+        props.add("sslTrustmanagerAlgorithm");
         props.add("compressionCodec");
-        props.add("connectionMaxIdleMs");
+        props.add("commitTimeoutMs");
+        props.add("workerPoolCoreSize");
+        props.add("autoCommitEnable");
         props.add("consumerRequestTimeoutMs");
-        props.add("consumersCount");
-        props.add("deliveryTimeoutMs");
-        props.add("enableIdempotence");
-        props.add("exceptionHandler");
-        props.add("exchangePattern");
-        props.add("fetchMaxBytes");
-        props.add("fetchMinBytes");
-        props.add("fetchWaitMaxMs");
-        props.add("groupId");
-        props.add("groupInstanceId");
-        props.add("headerDeserializer");
-        props.add("headerFilterStrategy");
-        props.add("headerSerializer");
-        props.add("heartbeatIntervalMs");
-        props.add("interceptorClasses");
-        props.add("kafkaClientFactory");
-        props.add("kafkaManualCommitFactory");
-        props.add("kerberosBeforeReloginMinTime");
+        props.add("maxPollIntervalMs");
         props.add("kerberosInitCmd");
-        props.add("kerberosPrincipalToLocalRules");
+        props.add("workerPoolMaxSize");
+        props.add("reconnectBackoffMs");
+        props.add("groupId");
+        props.add("offsetRepository");
         props.add("kerberosRenewJitter");
-        props.add("kerberosRenewWindowFactor");
-        props.add("key");
-        props.add("keyDeserializer");
-        props.add("keySerializer");
+        props.add("sslProvider");
+        props.add("saslKerberosServiceName");
+        props.add("bridgeErrorHandler");
+        props.add("shutdownTimeout");
+        props.add("saslMechanism");
+        props.add("workerPool");
+        props.add("deliveryTimeoutMs");
         props.add("lazyStartProducer");
-        props.add("lingerMs");
+        props.add("sslKeystorePassword");
+        props.add("kafkaManualCommitFactory");
+        props.add("sslEndpointAlgorithm");
+        props.add("topic");
+        props.add("sslProtocol");
+        props.add("sslKeymanagerAlgorithm");
+        props.add("pollTimeoutMs");
+        props.add("exceptionHandler");
         props.add("maxBlockMs");
-        props.add("maxInFlightRequest");
-        props.add("maxPartitionFetchBytes");
-        props.add("maxPollIntervalMs");
-        props.add("maxPollRecords");
-        props.add("maxRequestSize");
+        props.add("kerberosBeforeReloginMinTime");
+        props.add("groupInstanceId");
+        props.add("bufferMemorySize");
         props.add("metadataMaxAgeMs");
-        props.add("metricReporters");
-        props.add("metricsSampleWindowMs");
-        props.add("noOfMetricsSample");
-        props.add("offsetRepository");
-        props.add("partitionAssignor");
-        props.add("partitionKey");
-        props.add("partitioner");
-        props.add("pollOnError");
-        props.add("pollTimeoutMs");
-        props.add("producerBatchSize");
-        props.add("queueBufferingMaxMessages");
-        props.add("receiveBufferBytes");
-        props.add("reconnectBackoffMaxMs");
-        props.add("reconnectBackoffMs");
-        props.add("recordMetadata");
-        props.add("requestRequiredAcks");
-        props.add("requestTimeoutMs");
-        props.add("retries");
-        props.add("retryBackoffMs");
+        props.add("sslCipherSuites");
+        props.add("specificAvroReader");
         props.add("saslJaasConfig");
-        props.add("saslKerberosServiceName");
-        props.add("saslMechanism");
-        props.add("schemaRegistryURL");
+        props.add("fetchMinBytes");
+        props.add("connectionMaxIdleMs");
+        props.add("lingerMs");
+        props.add("kerberosRenewWindowFactor");
         props.add("securityProtocol");
-        props.add("seekTo");
-        props.add("sendBufferBytes");
-        props.add("sessionTimeoutMs");
-        props.add("shutdownTimeout");
-        props.add("specificAvroReader");
-        props.add("sslCipherSuites");
-        props.add("sslContextParameters");
+        props.add("autoCommitIntervalMs");
+        props.add("partitioner");
+        props.add("kerberosPrincipalToLocalRules");
+        props.add("headerSerializer");
         props.add("sslEnabledProtocols");
-        props.add("sslEndpointAlgorithm");
-        props.add("sslKeyPassword");
-        props.add("sslKeymanagerAlgorithm");
         props.add("sslKeystoreLocation");
-        props.add("sslKeystorePassword");
-        props.add("sslKeystoreType");
-        props.add("sslProtocol");
-        props.add("sslProvider");
-        props.add("sslTrustmanagerAlgorithm");
-        props.add("sslTruststoreLocation");
-        props.add("sslTruststorePassword");
-        props.add("sslTruststoreType");
-        props.add("synchronous");
-        props.add("topic");
-        props.add("topicIsPattern");
-        props.add("valueDeserializer");
+        props.add("schemaRegistryURL");
+        props.add("headerDeserializer");
+        props.add("maxInFlightRequest");
+        props.add("exchangePattern");
         props.add("valueSerializer");
-        props.add("workerPool");
-        props.add("workerPoolCoreSize");
-        props.add("workerPoolMaxSize");
+        props.add("autoOffsetReset");
+        props.add("seekTo");
+        props.add("kafkaClientFactory");
+        props.add("requestTimeoutMs");
+        props.add("fetchMaxBytes");
+        props.add("checkCrcs");
+        props.add("partitionAssignor");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         Set<String> secretProps = new HashSet<>(4);
-        secretProps.add("saslJaasConfig");
-        secretProps.add("sslKeyPassword");
         secretProps.add("sslKeystorePassword");
         secretProps.add("sslTruststorePassword");
+        secretProps.add("saslJaasConfig");
+        secretProps.add("sslKeyPassword");
         SECRET_PROPERTY_NAMES = Collections.unmodifiableSet(secretProps);
         Set<String> prefixes = new HashSet<>(1);
         prefixes.add("additionalProperties.");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 8e567ff..f3ed143 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -32,7 +32,6 @@
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled then [...]
     "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched by the consum [...]
     "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
-    "autoCommitOnStop": { "kind": "property", "displayName": "Auto Commit On 
Stop", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"sync", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to perform an explicit auto commit whe 
[...]
     "autoOffsetReset": { "kind": "property", "displayName": "Auto Offset 
Reset", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", 
"none" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "latest", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "What to do when there is no initial offset 
[...]
     "breakOnFirstError": { "kind": "property", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails. [...]
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
@@ -141,7 +140,6 @@
     "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual 
Commit", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to allow doing manual commits via 
KafkaManualCommit. If this option is enabled the [...]
     "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit 
Enable", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "If true, periodically commit to ZooKeeper the 
offset of messages already fetched by the consu [...]
     "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit 
Interval Ms", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "5000", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "The frequency in ms that 
the consumer offsets are committed to zookeeper." },
-    "autoCommitOnStop": { "kind": "parameter", "displayName": "Auto Commit On 
Stop", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"sync", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Whether to perform an explicit auto commit wh 
[...]
     "autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset 
Reset", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", 
"none" ], "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "latest", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "What to do when there is no initial offse [...]
     "breakOnFirstError": { "kind": "parameter", "displayName": "Break On First 
Error", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This options controls what happens when a 
consumer is processing an exchange and it fails [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a m [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 28f6e56..37f394b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -137,8 +137,6 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private boolean autoCommitEnable = true;
     @UriParam(label = "consumer")
     private boolean allowManualCommit;
-    @UriParam(label = "consumer", defaultValue = "sync", enums = 
"sync,async,none")
-    private String autoCommitOnStop = "sync";
     @UriParam(label = "consumer")
     private boolean breakOnFirstError;
     @UriParam(label = "consumer")
@@ -805,19 +803,6 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.autoOffsetReset = autoOffsetReset;
     }
 
-    public String getAutoCommitOnStop() {
-        return autoCommitOnStop;
-    }
-
-    /**
-     * Whether to perform an explicit auto commit when the consumer stops to 
ensure the broker has a commit from the
-     * last consumed message. This requires the option autoCommitEnable is 
turned on. The possible values are: sync,
-     * async, or none. And sync is the default value.
-     */
-    public void setAutoCommitOnStop(String autoCommitOnStop) {
-        this.autoCommitOnStop = autoCommitOnStop;
-    }
-
     public boolean isBreakOnFirstError() {
         return breakOnFirstError;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 7b4a342..d324766 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -512,6 +512,8 @@ class KafkaFetchRecords implements Runnable {
             consumer.wakeup();
             Thread.currentThread().interrupt();
         } finally {
+            IOHelper.close(consumer);
+
             if (lock.isHeldByCurrentThread()) {
                 lock.unlock();
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
index 2506ea4..258e393 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java
@@ -34,13 +34,7 @@ public final class CommitManagers {
         }
 
         if (configuration.isAutoCommitEnable()) {
-            if ("async".equals(configuration.getAutoCommitOnStop())) {
-                return new AsyncCommitManager(consumer, kafkaConsumer, 
threadId, printableTopic);
-            } else if ("sync".equals(configuration.getAutoCommitOnStop())) {
-                return new SyncCommitManager(consumer, kafkaConsumer, 
threadId, printableTopic);
-            } else if ("none".equals(configuration.getAutoCommitOnStop())) {
-                return new NoopCommitManager(consumer, kafkaConsumer, 
threadId, printableTopic);
-            }
+            return new AsyncCommitManager(consumer, kafkaConsumer, threadId, 
printableTopic);
         }
 
         KafkaManualCommitFactory manualCommitFactory = 
kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();

Reply via email to