This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit f9a9f749fea55c244745da6d4f7e191f47299a85 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Mon Mar 28 12:27:52 2022 +0200 CAMEL-17802: remove the autoCommitOnStop because the consumer already auto-commits on the call to close --- .../component/kafka/KafkaComponentConfigurer.java | 6 - .../component/kafka/KafkaEndpointConfigurer.java | 6 - .../component/kafka/KafkaEndpointUriFactory.java | 177 ++++++++++----------- .../org/apache/camel/component/kafka/kafka.json | 2 - .../camel/component/kafka/KafkaConfiguration.java | 15 -- .../camel/component/kafka/KafkaFetchRecords.java | 2 + .../component/kafka/consumer/CommitManagers.java | 8 +- 7 files changed, 91 insertions(+), 125 deletions(-) diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 8138ecd..0ac2077 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -36,8 +36,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "autoCommitEnable": getOrCreateConfiguration(target).setAutoCommitEnable(property(camelContext, boolean.class, value)); return true; case "autocommitintervalms": case "autoCommitIntervalMs": getOrCreateConfiguration(target).setAutoCommitIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true; - case "autocommitonstop": - case "autoCommitOnStop": getOrCreateConfiguration(target).setAutoCommitOnStop(property(camelContext, java.lang.String.class, value)); return true; case "autooffsetreset": case "autoOffsetReset": getOrCreateConfiguration(target).setAutoOffsetReset(property(camelContext, java.lang.String.class, value)); return true; case "autowiredenabled": @@ -256,8 +254,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "autoCommitEnable": return boolean.class; case "autocommitintervalms": case "autoCommitIntervalMs": return java.lang.Integer.class; - case "autocommitonstop": - case "autoCommitOnStop": return java.lang.String.class; case "autooffsetreset": case "autoOffsetReset": return java.lang.String.class; case "autowiredenabled": @@ -472,8 +468,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "autoCommitEnable": return getOrCreateConfiguration(target).isAutoCommitEnable(); case "autocommitintervalms": case "autoCommitIntervalMs": return getOrCreateConfiguration(target).getAutoCommitIntervalMs(); - case "autocommitonstop": - case "autoCommitOnStop": return getOrCreateConfiguration(target).getAutoCommitOnStop(); case "autooffsetreset": case "autoOffsetReset": return getOrCreateConfiguration(target).getAutoOffsetReset(); case "autowiredenabled": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index 2ee0b05..b36f1d0 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -29,8 +29,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "autoCommitEnable": target.getConfiguration().setAutoCommitEnable(property(camelContext, boolean.class, value)); return true; case "autocommitintervalms": case "autoCommitIntervalMs": target.getConfiguration().setAutoCommitIntervalMs(property(camelContext, java.lang.Integer.class, value)); return true; - case "autocommitonstop": - case "autoCommitOnStop": target.getConfiguration().setAutoCommitOnStop(property(camelContext, java.lang.String.class, value)); return true; case "autooffsetreset": case "autoOffsetReset": target.getConfiguration().setAutoOffsetReset(property(camelContext, java.lang.String.class, value)); return true; case "breakonfirsterror": @@ -233,8 +231,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "autoCommitEnable": return boolean.class; case "autocommitintervalms": case "autoCommitIntervalMs": return java.lang.Integer.class; - case "autocommitonstop": - case "autoCommitOnStop": return java.lang.String.class; case "autooffsetreset": case "autoOffsetReset": return java.lang.String.class; case "breakonfirsterror": @@ -438,8 +434,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "autoCommitEnable": return target.getConfiguration().isAutoCommitEnable(); case "autocommitintervalms": case "autoCommitIntervalMs": return target.getConfiguration().getAutoCommitIntervalMs(); - case "autocommitonstop": - case "autoCommitOnStop": return target.getConfiguration().getAutoCommitOnStop(); case "autooffsetreset": case "autoOffsetReset": return target.getConfiguration().getAutoOffsetReset(); case "breakonfirsterror": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java index 667b5e3..0b40860 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java @@ -21,115 +21,114 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(102); - props.add("additionalProperties"); + Set<String> props = new HashSet<>(101); + props.add("synchronous"); + props.add("queueBufferingMaxMessages"); props.add("allowManualCommit"); - props.add("autoCommitEnable"); - props.add("autoCommitIntervalMs"); - props.add("autoCommitOnStop"); - props.add("autoOffsetReset"); + props.add("consumersCount"); + props.add("receiveBufferBytes"); + props.add("reconnectBackoffMaxMs"); + props.add("valueDeserializer"); + props.add("metricReporters"); + props.add("sslTruststoreType"); + props.add("sendBufferBytes"); + props.add("heartbeatIntervalMs"); + props.add("interceptorClasses"); + props.add("sslKeystoreType"); props.add("breakOnFirstError"); - props.add("bridgeErrorHandler"); + props.add("requestRequiredAcks"); + props.add("enableIdempotence"); + props.add("pollOnError"); + props.add("fetchWaitMaxMs"); + props.add("retries"); + props.add("maxPollRecords"); + props.add("additionalProperties"); + props.add("keyDeserializer"); + props.add("producerBatchSize"); + props.add("retryBackoffMs"); props.add("brokers"); - props.add("bufferMemorySize"); - props.add("checkCrcs"); + props.add("metricsSampleWindowMs"); + props.add("sslContextParameters"); + props.add("sslKeyPassword"); + props.add("keySerializer"); + props.add("noOfMetricsSample"); + props.add("maxPartitionFetchBytes"); + props.add("partitionKey"); + props.add("headerFilterStrategy"); + props.add("sslTruststorePassword"); + props.add("sessionTimeoutMs"); + props.add("key"); + props.add("topicIsPattern"); + props.add("sslTruststoreLocation"); props.add("clientId"); - props.add("commitTimeoutMs"); + props.add("maxRequestSize"); + props.add("recordMetadata"); + props.add("sslTrustmanagerAlgorithm"); props.add("compressionCodec"); - props.add("connectionMaxIdleMs"); + props.add("commitTimeoutMs"); + props.add("workerPoolCoreSize"); + props.add("autoCommitEnable"); props.add("consumerRequestTimeoutMs"); - props.add("consumersCount"); - props.add("deliveryTimeoutMs"); - props.add("enableIdempotence"); - props.add("exceptionHandler"); - props.add("exchangePattern"); - props.add("fetchMaxBytes"); - props.add("fetchMinBytes"); - props.add("fetchWaitMaxMs"); - props.add("groupId"); - props.add("groupInstanceId"); - props.add("headerDeserializer"); - props.add("headerFilterStrategy"); - props.add("headerSerializer"); - props.add("heartbeatIntervalMs"); - props.add("interceptorClasses"); - props.add("kafkaClientFactory"); - props.add("kafkaManualCommitFactory"); - props.add("kerberosBeforeReloginMinTime"); + props.add("maxPollIntervalMs"); props.add("kerberosInitCmd"); - props.add("kerberosPrincipalToLocalRules"); + props.add("workerPoolMaxSize"); + props.add("reconnectBackoffMs"); + props.add("groupId"); + props.add("offsetRepository"); props.add("kerberosRenewJitter"); - props.add("kerberosRenewWindowFactor"); - props.add("key"); - props.add("keyDeserializer"); - props.add("keySerializer"); + props.add("sslProvider"); + props.add("saslKerberosServiceName"); + props.add("bridgeErrorHandler"); + props.add("shutdownTimeout"); + props.add("saslMechanism"); + props.add("workerPool"); + props.add("deliveryTimeoutMs"); props.add("lazyStartProducer"); - props.add("lingerMs"); + props.add("sslKeystorePassword"); + props.add("kafkaManualCommitFactory"); + props.add("sslEndpointAlgorithm"); + props.add("topic"); + props.add("sslProtocol"); + props.add("sslKeymanagerAlgorithm"); + props.add("pollTimeoutMs"); + props.add("exceptionHandler"); props.add("maxBlockMs"); - props.add("maxInFlightRequest"); - props.add("maxPartitionFetchBytes"); - props.add("maxPollIntervalMs"); - props.add("maxPollRecords"); - props.add("maxRequestSize"); + props.add("kerberosBeforeReloginMinTime"); + props.add("groupInstanceId"); + props.add("bufferMemorySize"); props.add("metadataMaxAgeMs"); - props.add("metricReporters"); - props.add("metricsSampleWindowMs"); - props.add("noOfMetricsSample"); - props.add("offsetRepository"); - props.add("partitionAssignor"); - props.add("partitionKey"); - props.add("partitioner"); - props.add("pollOnError"); - props.add("pollTimeoutMs"); - props.add("producerBatchSize"); - props.add("queueBufferingMaxMessages"); - props.add("receiveBufferBytes"); - props.add("reconnectBackoffMaxMs"); - props.add("reconnectBackoffMs"); - props.add("recordMetadata"); - props.add("requestRequiredAcks"); - props.add("requestTimeoutMs"); - props.add("retries"); - props.add("retryBackoffMs"); + props.add("sslCipherSuites"); + props.add("specificAvroReader"); props.add("saslJaasConfig"); - props.add("saslKerberosServiceName"); - props.add("saslMechanism"); - props.add("schemaRegistryURL"); + props.add("fetchMinBytes"); + props.add("connectionMaxIdleMs"); + props.add("lingerMs"); + props.add("kerberosRenewWindowFactor"); props.add("securityProtocol"); - props.add("seekTo"); - props.add("sendBufferBytes"); - props.add("sessionTimeoutMs"); - props.add("shutdownTimeout"); - props.add("specificAvroReader"); - props.add("sslCipherSuites"); - props.add("sslContextParameters"); + props.add("autoCommitIntervalMs"); + props.add("partitioner"); + props.add("kerberosPrincipalToLocalRules"); + props.add("headerSerializer"); props.add("sslEnabledProtocols"); - props.add("sslEndpointAlgorithm"); - props.add("sslKeyPassword"); - props.add("sslKeymanagerAlgorithm"); props.add("sslKeystoreLocation"); - props.add("sslKeystorePassword"); - props.add("sslKeystoreType"); - props.add("sslProtocol"); - props.add("sslProvider"); - props.add("sslTrustmanagerAlgorithm"); - props.add("sslTruststoreLocation"); - props.add("sslTruststorePassword"); - props.add("sslTruststoreType"); - props.add("synchronous"); - props.add("topic"); - props.add("topicIsPattern"); - props.add("valueDeserializer"); + props.add("schemaRegistryURL"); + props.add("headerDeserializer"); + props.add("maxInFlightRequest"); + props.add("exchangePattern"); props.add("valueSerializer"); - props.add("workerPool"); - props.add("workerPoolCoreSize"); - props.add("workerPoolMaxSize"); + props.add("autoOffsetReset"); + props.add("seekTo"); + props.add("kafkaClientFactory"); + props.add("requestTimeoutMs"); + props.add("fetchMaxBytes"); + props.add("checkCrcs"); + props.add("partitionAssignor"); PROPERTY_NAMES = Collections.unmodifiableSet(props); Set<String> secretProps = new HashSet<>(4); - secretProps.add("saslJaasConfig"); - secretProps.add("sslKeyPassword"); secretProps.add("sslKeystorePassword"); secretProps.add("sslTruststorePassword"); + secretProps.add("saslJaasConfig"); + secretProps.add("sslKeyPassword"); SECRET_PROPERTY_NAMES = Collections.unmodifiableSet(secretProps); Set<String> prefixes = new HashSet<>(1); prefixes.add("additionalProperties."); diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 8e567ff..f3ed143 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -32,7 +32,6 @@ "allowManualCommit": { "kind": "property", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then [...] "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consum [...] "autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." }, - "autoCommitOnStop": { "kind": "property", "displayName": "Auto Commit On Stop", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sync", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to perform an explicit auto commit whe [...] "autoOffsetReset": { "kind": "property", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do when there is no initial offset [...] "breakOnFirstError": { "kind": "property", "displayName": "Break On First Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This options controls what happens when a consumer is processing an exchange and it fails. [...] "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] @@ -141,7 +140,6 @@ "allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled the [...] "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit Enable", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "If true, periodically commit to ZooKeeper the offset of messages already fetched by the consu [...] "autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit Interval Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The frequency in ms that the consumer offsets are committed to zookeeper." }, - "autoCommitOnStop": { "kind": "parameter", "displayName": "Auto Commit On Stop", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "sync", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether to perform an explicit auto commit wh [...] "autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset Reset", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "latest", "earliest", "none" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "latest", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do when there is no initial offse [...] "breakOnFirstError": { "kind": "parameter", "displayName": "Break On First Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This options controls what happens when a consumer is processing an exchange and it fails [...] "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 28f6e56..37f394b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -137,8 +137,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private boolean autoCommitEnable = true; @UriParam(label = "consumer") private boolean allowManualCommit; - @UriParam(label = "consumer", defaultValue = "sync", enums = "sync,async,none") - private String autoCommitOnStop = "sync"; @UriParam(label = "consumer") private boolean breakOnFirstError; @UriParam(label = "consumer") @@ -805,19 +803,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware this.autoOffsetReset = autoOffsetReset; } - public String getAutoCommitOnStop() { - return autoCommitOnStop; - } - - /** - * Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the - * last consumed message. This requires the option autoCommitEnable is turned on. The possible values are: sync, - * async, or none. And sync is the default value. - */ - public void setAutoCommitOnStop(String autoCommitOnStop) { - this.autoCommitOnStop = autoCommitOnStop; - } - public boolean isBreakOnFirstError() { return breakOnFirstError; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 7b4a342..d324766 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -512,6 +512,8 @@ class KafkaFetchRecords implements Runnable { consumer.wakeup(); Thread.currentThread().interrupt(); } finally { + IOHelper.close(consumer); + if (lock.isHeldByCurrentThread()) { lock.unlock(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java index 2506ea4..258e393 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java @@ -34,13 +34,7 @@ public final class CommitManagers { } if (configuration.isAutoCommitEnable()) { - if ("async".equals(configuration.getAutoCommitOnStop())) { - return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic); - } else if ("sync".equals(configuration.getAutoCommitOnStop())) { - return new SyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic); - } else if ("none".equals(configuration.getAutoCommitOnStop())) { - return new NoopCommitManager(consumer, kafkaConsumer, threadId, printableTopic); - } + return new AsyncCommitManager(consumer, kafkaConsumer, threadId, printableTopic); } KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory();