This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9bdbf7b6b394561058a7ef7198997244f43929d0 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Jan 19 11:40:26 2022 +0100 CAMEL-15562: created a commit manager in camel-kafka This reorganizes the commit management code so that we can unify managing different commit strategies. --- .../component/kafka/KafkaComponentConfigurer.java | 4 +- .../component/kafka/KafkaEndpointConfigurer.java | 4 +- .../org/apache/camel/component/kafka/kafka.json | 4 +- .../camel/component/kafka/KafkaComponent.java | 2 + .../camel/component/kafka/KafkaConfiguration.java | 1 + .../camel/component/kafka/KafkaEndpoint.java | 2 + .../camel/component/kafka/KafkaFetchRecords.java | 39 ++--- ...kaAsyncManualCommit.java => CommitManager.java} | 27 ++- .../kafka/consumer/DefaultCommitManager.java | 195 +++++++++++++++++++++ .../consumer/DefaultKafkaManualAsyncCommit.java | 3 +- .../DefaultKafkaManualAsyncCommitFactory.java | 2 - .../kafka/consumer/DefaultKafkaManualCommit.java | 1 - .../consumer/DefaultKafkaManualCommitFactory.java | 2 - .../consumer/DefaultKafkaManualSyncCommit.java | 4 +- .../kafka/consumer/KafkaAsyncManualCommit.java | 2 - .../kafka/{ => consumer}/KafkaManualCommit.java | 2 +- .../{ => consumer}/KafkaManualCommitFactory.java | 3 +- .../consumer/support/KafkaRecordProcessor.java | 131 +------------- .../support/KafkaRecordProcessorFacade.java | 20 +-- .../support/PartitionAssignmentListener.java | 12 +- .../kafka/consumer/support/ProcessingResult.java | 4 +- .../KafkaConsumerAsyncManualCommitIT.java | 6 +- .../integration/KafkaConsumerManualCommitIT.java | 2 +- .../ROOT/pages/camel-3x-upgrade-guide-3_15.adoc | 2 + .../dsl/KafkaComponentBuilderFactory.java | 9 +- .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 13 +- 26 files changed, 285 insertions(+), 211 deletions(-) diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index d8f6a58..0d5a1c2 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -91,7 +91,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "kafkaclientfactory": case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true; case "kafkamanualcommitfactory": - case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true; + case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class, value)); return true; case "kerberosbeforereloginmintime": case "kerberosBeforeReloginMinTime": getOrCreateConfiguration(target).setKerberosBeforeReloginMinTime(property(camelContext, java.lang.Integer.class, value)); return true; case "kerberosinitcmd": @@ -305,7 +305,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "kafkaclientfactory": case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class; case "kafkamanualcommitfactory": - case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class; + case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class; case "kerberosbeforereloginmintime": case "kerberosBeforeReloginMinTime": return java.lang.Integer.class; case "kerberosinitcmd": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index 7c3ef61..270e758 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -85,7 +85,7 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "kafkaclientfactory": case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true; case "kafkamanualcommitfactory": - case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true; + case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class, value)); return true; case "kerberosbeforereloginmintime": case "kerberosBeforeReloginMinTime": target.getConfiguration().setKerberosBeforeReloginMinTime(property(camelContext, java.lang.Integer.class, value)); return true; case "kerberosinitcmd": @@ -291,7 +291,7 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "kafkaclientfactory": case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class; case "kafkamanualcommitfactory": - case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class; + case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory.class; case "kerberosbeforereloginmintime": case "kerberosBeforeReloginMinTime": return java.lang.Integer.class; case "kerberosinitcmd": diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 4a3781f..ddaeb8b 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -61,7 +61,7 @@ "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...] "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...] "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...] - "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in cas [...] + "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": true, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instanc [...] "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." }, "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...] "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...] @@ -169,7 +169,7 @@ "valueDeserializer": { "kind": "parameter", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value t [...] "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in c [...] + "kafkaManualCommitFactory": { "kind": "parameter", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit insta [...] "bufferMemorySize": { "kind": "parameter", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to b [...] "compressionCodec": { "kind": "parameter", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify th [...] "connectionMaxIdleMs": { "kind": "parameter", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by thi [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index abe2a0c..dde3327 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -21,6 +21,8 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.SSLContextParametersAware; import org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 1e5d833..74a1cb1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 39efe6d..1821084 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -24,6 +24,8 @@ import org.apache.camel.Consumer; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; import org.apache.camel.spi.ClassResolver; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 5b529df..fe51691 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -22,12 +22,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; -import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit; +import org.apache.camel.component.kafka.consumer.CommitManager; +import org.apache.camel.component.kafka.consumer.DefaultCommitManager; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade; import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; import org.apache.camel.component.kafka.consumer.support.ProcessingResult; @@ -53,7 +53,7 @@ class KafkaFetchRecords implements Runnable { private final PollExceptionStrategy pollExceptionStrategy; private final BridgeExceptionHandlerToErrorHandler bridge; private final ReentrantLock lock = new ReentrantLock(); - private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = new ConcurrentLinkedQueue<>(); + private CommitManager commitManager; private boolean retry = true; private boolean reconnect; // must be false at init (this is the policy whether to reconnect) @@ -84,6 +84,8 @@ class KafkaFetchRecords implements Runnable { initializeConsumer(); setConnected(true); + + commitManager = new DefaultCommitManager(consumer, kafkaConsumer, threadId, getPrintableTopic()); } } catch (Exception e) { setConnected(false); @@ -136,7 +138,7 @@ class KafkaFetchRecords implements Runnable { private void subscribe() { PartitionAssignmentListener listener = new PartitionAssignmentListener( threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, - this::isRunnable); + this::isRunnable, commitManager); if (LOG.isInfoEnabled()) { LOG.info("Subscribing {} to {}", threadId, getPrintableTopic()); @@ -166,13 +168,13 @@ class KafkaFetchRecords implements Runnable { } KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade( - kafkaConsumer, lastProcessedOffset, threadId, consumer, asyncCommits); + kafkaConsumer, lastProcessedOffset, threadId, commitManager); Duration pollDuration = Duration.ofMillis(pollTimeoutMs); while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) { ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); - processAsyncCommits(); + commitManager.processAsyncCommits(); ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); @@ -188,14 +190,14 @@ class KafkaFetchRecords implements Runnable { if (!isConnected()) { LOG.debug("Not reconnecting, check whether to auto-commit or not ..."); - commit(); + commitManager.commit(); } safeUnsubscribe(); } catch (InterruptException e) { kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); - commit(); + commitManager.commit(); LOG.info("Unsubscribing {} from {}", threadId, getPrintableTopic()); safeUnsubscribe(); @@ -230,12 +232,6 @@ class KafkaFetchRecords implements Runnable { } } - private void processAsyncCommits() { - while (!asyncCommits.isEmpty()) { - asyncCommits.poll().processAsyncCommit(); - } - } - private void handleAccordingToStrategy(long partitionLastOffset, Exception e) { PollOnError onError = pollExceptionStrategy.handleException(e); if (PollOnError.RETRY == onError) { @@ -276,21 +272,6 @@ class KafkaFetchRecords implements Runnable { } } - private void commit() { - processAsyncCommits(); - if (kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) { - if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from {}", threadId, getPrintableTopic()); - consumer.commitAsync(); - } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from {}", threadId, getPrintableTopic()); - consumer.commitSync(); - } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commit on stop {} from {} is disabled (none)", threadId, getPrintableTopic()); - } - } - } - private void handlePollStop() { // stop and terminate consumer LOG.warn("Requesting the consumer to stop based on polling exception strategy"); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java similarity index 58% copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java index 3984998..d9fdc0a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java @@ -14,16 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.camel.component.kafka.consumer; -import org.apache.camel.component.kafka.KafkaManualCommit; +import org.apache.camel.Exchange; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; -/** - * Can be used for forcing async manual offset commit when using Kafka consumer. - */ -public interface KafkaAsyncManualCommit extends KafkaManualCommit { - /** - * Used in the consumer loop to effectively call org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync() - */ - void processAsyncCommit(); +public interface CommitManager { + + KafkaManualCommit getManualCommit(Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record); + + void commitOffset(TopicPartition partition, long partitionLastOffset); + + void commitOffsetForce(TopicPartition partition, long partitionLastOffset); + + void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset); + + @Deprecated + void processAsyncCommits(); + + void commit(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java new file mode 100644 index 0000000..76ae5ea --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultCommitManager.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaConsumer; +import org.apache.camel.spi.StateRepository; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultCommitManager implements CommitManager { + public static final long START_OFFSET = -1; + + private static final Logger LOG = LoggerFactory.getLogger(DefaultCommitManager.class); + private final Consumer<?, ?> consumer; + private final KafkaConsumer kafkaConsumer; + private final String threadId; + private final String printableTopic; + private final KafkaConfiguration configuration; + + private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits = new ConcurrentLinkedQueue<>(); + + public DefaultCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) { + this.consumer = consumer; + this.kafkaConsumer = kafkaConsumer; + this.threadId = threadId; + this.printableTopic = printableTopic; + this.configuration = kafkaConsumer.getEndpoint().getConfiguration(); + } + + public void processAsyncCommits() { + while (!asyncCommits.isEmpty()) { + asyncCommits.poll().processAsyncCommit(); + } + } + + @Override + public KafkaManualCommit getManualCommit( + Exchange exchange, TopicPartition partition, ConsumerRecord<Object, Object> record) { + KafkaManualCommitFactory manualCommitFactory = kafkaConsumer.getEndpoint().getKafkaManualCommitFactory(); + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + long commitTimeoutMs = configuration.getCommitTimeoutMs(); + + return manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId, + offsetRepository, partition, record.offset(), commitTimeoutMs, asyncCommits); + } + + @Override + public void commit() { + processAsyncCommits(); + if (kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) { + if ("async".equals(configuration.getAutoCommitOnStop())) { + LOG.info("Auto commitAsync on stop {} from {}", threadId, printableTopic); + consumer.commitAsync(); + } else if ("sync".equals(configuration.getAutoCommitOnStop())) { + LOG.info("Auto commitSync on stop {} from {}", threadId, printableTopic); + consumer.commitSync(); + } else if ("none".equals(configuration.getAutoCommitOnStop())) { + LOG.info("Auto commit on stop {} from {} is disabled (none)", threadId, printableTopic); + } + } + } + + @Override + public void commitOffset(TopicPartition partition, long partitionLastOffset) { + if (partitionLastOffset == START_OFFSET) { + return; + } + + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + + if (!configuration.isAllowManualCommit() && offsetRepository != null) { + saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository); + } + } + + @Override + public void commitOffsetOnStop(TopicPartition partition, long partitionLastOffset) { + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + + if (!configuration.isAllowManualCommit() && offsetRepository != null) { + saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository); + } else { + // if we are stopping then react according to the configured option + if ("async".equals(configuration.getAutoCommitOnStop())) { + commitAsync(consumer, partition, partitionLastOffset); + } else if ("sync".equals(configuration.getAutoCommitOnStop())) { + commitSync(configuration, consumer, partition, partitionLastOffset); + + } else if ("none".equals(configuration.getAutoCommitOnStop())) { + noCommit(partition); + } + } + } + + @Override + public void commitOffsetForce(TopicPartition partition, long partitionLastOffset) { + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + + if (!configuration.isAllowManualCommit() && offsetRepository != null) { + saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository); + } else { + forceSyncCommit(configuration, consumer, partition, partitionLastOffset); + } + } + + private void commitOffset( + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, + long partitionLastOffset) { + long timeout = configuration.getCommitTimeoutMs(); + consumer.commitSync( + Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)), + Duration.ofMillis(timeout)); + } + + private void forceSyncCommit( + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) { + if (LOG.isDebugEnabled()) { + LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(), + partition.partition(), partitionLastOffset); + } + + commitOffset(configuration, consumer, partition, partitionLastOffset); + } + + private void noCommit(TopicPartition partition) { + if (LOG.isDebugEnabled()) { + LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", threadId, partition.topic()); + } + } + + private void commitSync( + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Auto commitSync on stop {} from topic {}", threadId, partition.topic()); + } + + commitOffset(configuration, consumer, partition, partitionLastOffset); + } + + private void commitAsync(Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic()); + } + + consumer.commitAsync( + Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)), null); + } + + private void saveStateToOffsetRepository( + TopicPartition partition, long partitionLastOffset, + StateRepository<String, String> offsetRepository) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(), + partition.partition(), + partitionLastOffset); + } + offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset)); + } + + private static String serializeOffsetKey(TopicPartition topicPartition) { + return topicPartition.topic() + '/' + topicPartition.partition(); + } + + private static String serializeOffsetValue(long offset) { + return String.valueOf(offset); + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java index af33b0f..c976a54 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java @@ -19,7 +19,6 @@ package org.apache.camel.component.kafka.consumer; import java.util.Collection; import java.util.Collections; -import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -55,7 +54,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl protected void commitAsyncOffset( StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) { - if (recordOffset != KafkaRecordProcessor.START_OFFSET) { + if (recordOffset != DefaultCommitManager.START_OFFSET) { if (offsetRepository != null) { offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset)); } else { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java index cc8442a..792c798 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommitFactory.java @@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer; import java.util.Collection; import org.apache.camel.Exchange; -import org.apache.camel.component.kafka.KafkaManualCommit; -import org.apache.camel.component.kafka.KafkaManualCommitFactory; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java index dd92ee0..95da58c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.kafka.consumer; -import org.apache.camel.component.kafka.KafkaManualCommit; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java index c8670ff..4b8041e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommitFactory.java @@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer; import java.util.Collection; import org.apache.camel.Exchange; -import org.apache.camel.component.kafka.KafkaManualCommit; -import org.apache.camel.component.kafka.KafkaManualCommitFactory; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java index 5c86240..cefb68a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualSyncCommit.java @@ -19,8 +19,6 @@ package org.apache.camel.component.kafka.consumer; import java.time.Duration; import java.util.Collections; -import org.apache.camel.component.kafka.KafkaManualCommit; -import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -46,7 +44,7 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple } protected void commitOffset(StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset) { - if (recordOffset != KafkaRecordProcessor.START_OFFSET) { + if (recordOffset != DefaultCommitManager.START_OFFSET) { if (offsetRepository != null) { offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(recordOffset)); } else { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java index 3984998..eff9eea 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaAsyncManualCommit.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.kafka.consumer; -import org.apache.camel.component.kafka.KafkaManualCommit; - /** * Can be used for forcing async manual offset commit when using Kafka consumer. */ diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java similarity index 95% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java index 5ed94f8..b85c073 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommit.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kafka; +package org.apache.camel.component.kafka.consumer; /** * Can be used for forcing manual offset commit when using Kafka consumer. diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java similarity index 92% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java index daa1213..10d8893 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/KafkaManualCommitFactory.java @@ -14,12 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kafka; +package org.apache.camel.component.kafka.consumer; import java.util.Collection; import org.apache.camel.Exchange; -import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index b83fe4d..df288d7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -16,55 +16,38 @@ */ package org.apache.camel.component.kafka.consumer.support; -import java.time.Duration; -import java.util.Collections; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.StreamSupport; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.component.kafka.KafkaManualCommit; -import org.apache.camel.component.kafka.KafkaManualCommitFactory; +import org.apache.camel.component.kafka.consumer.CommitManager; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HeaderFilterStrategy; -import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaRecordProcessor { - public static final long START_OFFSET = -1; private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessor.class); private final boolean autoCommitEnabled; private final KafkaConfiguration configuration; private final Processor processor; - private final Consumer<?, ?> consumer; - private final KafkaManualCommitFactory manualCommitFactory; - private final String threadId; - private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits; + private final CommitManager commitManager; - public KafkaRecordProcessor(KafkaConfiguration configuration, - Processor processor, Consumer<?, ?> consumer, - KafkaManualCommitFactory manualCommitFactory, - String threadId, ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) { + public KafkaRecordProcessor(KafkaConfiguration configuration, Processor processor, CommitManager commitManager) { this.autoCommitEnabled = configuration.isAutoCommitEnable(); this.configuration = configuration; this.processor = processor; - this.consumer = consumer; - this.manualCommitFactory = manualCommitFactory; - this.threadId = threadId; - this.asyncCommits = asyncCommits; + this.commitManager = commitManager; } private void setupExchangeMessage(Message message, ConsumerRecord record) { @@ -115,11 +98,9 @@ public class KafkaRecordProcessor { } if (configuration.isAllowManualCommit()) { - StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); - // allow Camel users to access the Kafka consumer API to be able to do for example manual commits - KafkaManualCommit manual = manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId, - offsetRepository, partition, record.offset(), configuration.getCommitTimeoutMs(), asyncCommits); + KafkaManualCommit manual = commitManager.getManualCommit(exchange, partition, record); + message.setHeader(KafkaConstants.MANUAL_COMMIT, manual); message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext && !partitionHasNext); } @@ -151,7 +132,7 @@ public class KafkaRecordProcessor { } // force commit, so we resume on next poll where we failed - commitOffset(partition, partitionLastOffset, false, true); + commitManager.commitOffsetForce(partition, partitionLastOffset); // continue to next partition return true; @@ -163,106 +144,10 @@ public class KafkaRecordProcessor { return false; } - public void commitOffset( - TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit) { - commitOffset(configuration, consumer, partition, partitionLastOffset, stopping, forceCommit, threadId); - } - - public static void commitOffset( - KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, - boolean stopping, boolean forceCommit, String threadId) { - - if (partitionLastOffset == START_OFFSET) { - return; - } - - StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); - - if (!configuration.isAllowManualCommit() && offsetRepository != null) { - saveStateToOffsetRepository(partition, partitionLastOffset, threadId, offsetRepository); - } else if (stopping) { - // if we are stopping then react according to the configured option - if ("async".equals(configuration.getAutoCommitOnStop())) { - commitAsync(consumer, partition, partitionLastOffset, threadId); - } else if ("sync".equals(configuration.getAutoCommitOnStop())) { - commitSync(configuration, consumer, partition, partitionLastOffset, threadId); - - } else if ("none".equals(configuration.getAutoCommitOnStop())) { - noCommit(partition, threadId); - } - } else if (forceCommit) { - forceSyncCommit(configuration, consumer, partition, partitionLastOffset, threadId); - } - } - - private static void commitOffset( - KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, - long partitionLastOffset) { - long timeout = configuration.getCommitTimeoutMs(); - consumer.commitSync( - Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)), - Duration.ofMillis(timeout)); - } - - private static void forceSyncCommit( - KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, - String threadId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(), - partition.partition(), partitionLastOffset); - } - - commitOffset(configuration, consumer, partition, partitionLastOffset); - } - - private static void noCommit(TopicPartition partition, String threadId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Auto commit on stop {} from topic {} is disabled (none)", threadId, partition.topic()); - } - } - - private static void commitSync( - KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, - String threadId) { - - if (LOG.isDebugEnabled()) { - LOG.debug("Auto commitSync on stop {} from topic {}", threadId, partition.topic()); - } - - commitOffset(configuration, consumer, partition, partitionLastOffset); - } - - private static void commitAsync( - Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) { - - if (LOG.isDebugEnabled()) { - LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic()); - } - - consumer.commitAsync( - Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1)), null); - } - - private static void saveStateToOffsetRepository( - TopicPartition partition, long partitionLastOffset, String threadId, - StateRepository<String, String> offsetRepository) { - - if (LOG.isDebugEnabled()) { - LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(), - partition.partition(), - partitionLastOffset); - } - offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset)); - } - public static String serializeOffsetKey(TopicPartition topicPartition) { return topicPartition.topic() + '/' + topicPartition.partition(); } - public static String serializeOffsetValue(long offset) { - return String.valueOf(offset); - } - public static long deserializeOffsetValue(String offset) { return Long.parseLong(offset); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index 9d77581..10d893e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -21,11 +21,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.camel.Exchange; -import org.apache.camel.component.kafka.consumer.KafkaAsyncManualCommit; import org.apache.camel.component.kafka.KafkaConsumer; +import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -41,15 +40,17 @@ public class KafkaRecordProcessorFacade { private final Map<String, Long> lastProcessedOffset; private final String threadId; private final KafkaRecordProcessor kafkaRecordProcessor; + private final CommitManager commitManager; public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer, Map<String, Long> lastProcessedOffset, String threadId, - org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, - ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) { + CommitManager commitManager) { this.camelKafkaConsumer = camelKafkaConsumer; this.lastProcessedOffset = lastProcessedOffset; this.threadId = threadId; + this.commitManager = commitManager; + + kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager); - kafkaRecordProcessor = buildKafkaRecordProcessor(consumer, asyncCommits); } private boolean isStopping() { @@ -83,7 +84,7 @@ public class KafkaRecordProcessorFacade { if (!lastResult.isBreakOnErrorHit()) { LOG.debug("Committing offset on successful execution"); // all records processed from partition so commit them - kafkaRecordProcessor.commitOffset(partition, lastResult.getPartitionLastOffset(), false, false); + commitManager.commitOffset(partition, lastResult.getPartitionLastOffset()); } } @@ -136,13 +137,10 @@ public class KafkaRecordProcessorFacade { } } - private KafkaRecordProcessor buildKafkaRecordProcessor( - org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, - ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) { + private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) { return new KafkaRecordProcessor( camelKafkaConsumer.getEndpoint().getConfiguration(), camelKafkaConsumer.getProcessor(), - consumer, - camelKafkaConsumer.getEndpoint().getKafkaManualCommitFactory(), threadId, asyncCommits); + commitManager); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index 51854b1..b034293 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.function.Supplier; import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -38,15 +39,17 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private final Consumer consumer; private final Map<String, Long> lastProcessedOffset; private final KafkaConsumerResumeStrategy resumeStrategy; + private final CommitManager commitManager; private Supplier<Boolean> stopStateSupplier; public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, Consumer consumer, Map<String, Long> lastProcessedOffset, - Supplier<Boolean> stopStateSupplier) { + Supplier<Boolean> stopStateSupplier, CommitManager commitManager) { this.threadId = threadId; this.configuration = configuration; this.consumer = consumer; this.lastProcessedOffset = lastProcessedOffset; + this.commitManager = commitManager; this.stopStateSupplier = stopStateSupplier; this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration); @@ -69,7 +72,12 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { try { // only commit offsets if the component has control if (configuration.getAutoCommitEnable()) { - KafkaRecordProcessor.commitOffset(configuration, consumer, partition, offset, stopping, false, threadId); + if (stopping) { + commitManager.commitOffsetOnStop(partition, offset); + } else { + commitManager.commitOffset(partition, offset); + } + } } catch (Exception e) { LOG.error("Error saving offset repository state {} from offsetKey {} with offset: {}", threadId, offsetKey, diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java index a413a4a..79934d0 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java @@ -17,8 +17,10 @@ package org.apache.camel.component.kafka.consumer.support; +import org.apache.camel.component.kafka.consumer.DefaultCommitManager; + public final class ProcessingResult { - private static final ProcessingResult UNPROCESSED_RESULT = new ProcessingResult(false, KafkaRecordProcessor.START_OFFSET); + private static final ProcessingResult UNPROCESSED_RESULT = new ProcessingResult(false, DefaultCommitManager.START_OFFSET); private final boolean breakOnErrorHit; private final long partitionLastOffset; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java index 960b198..6f2a642 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java @@ -25,10 +25,10 @@ import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; import org.apache.camel.builder.AggregationStrategies; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory; import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.component.kafka.KafkaManualCommit; -import org.apache.camel.component.kafka.KafkaManualCommitFactory; +import org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; import org.apache.camel.component.mock.MockEndpoint; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterEach; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java index e430ea3..5334aa5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java @@ -23,7 +23,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; -import org.apache.camel.component.kafka.KafkaManualCommit; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.mock.MockEndpoint; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterEach; diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc index 6632c5e..c423441 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_15.adoc @@ -181,6 +181,8 @@ The following classes were moved from `org.apache.camel.component.kafka` to `org * DefaultKafkaManualSyncCommit * DefaultKafkaManualSyncCommitFactory * KafkaAsyncManualCommit +* KafkaManualCommit +* KafkaManualCommitFactory === camel-karaf diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 2ce40d2..b608f8a 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -21,6 +21,7 @@ import org.apache.camel.Component; import org.apache.camel.builder.component.AbstractComponentBuilder; import org.apache.camel.builder.component.ComponentBuilder; import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; /** * Sent and receive messages to/from an Apache Kafka broker. @@ -789,7 +790,7 @@ public interface KafkaComponentBuilderFactory { * box. * * The option is a: - * <code>org.apache.camel.component.kafka.KafkaManualCommitFactory</code> type. + * <code>org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory</code> type. * * Group: consumer (advanced) * @@ -797,7 +798,7 @@ public interface KafkaComponentBuilderFactory { * @return the dsl builder */ default KafkaComponentBuilder kafkaManualCommitFactory( - org.apache.camel.component.kafka.KafkaManualCommitFactory kafkaManualCommitFactory) { + KafkaManualCommitFactory kafkaManualCommitFactory) { doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory); return this; } @@ -2046,7 +2047,7 @@ public interface KafkaComponentBuilderFactory { case "specificAvroReader": getOrCreateConfiguration((KafkaComponent) component).setSpecificAvroReader((boolean) value); return true; case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) component).setTopicIsPattern((boolean) value); return true; case "valueDeserializer": getOrCreateConfiguration((KafkaComponent) component).setValueDeserializer((java.lang.String) value); return true; - case "kafkaManualCommitFactory": ((KafkaComponent) component).setKafkaManualCommitFactory((org.apache.camel.component.kafka.KafkaManualCommitFactory) value); return true; + case "kafkaManualCommitFactory": ((KafkaComponent) component).setKafkaManualCommitFactory((KafkaManualCommitFactory) value); return true; case "pollExceptionStrategy": ((KafkaComponent) component).setPollExceptionStrategy((org.apache.camel.component.kafka.PollExceptionStrategy) value); return true; case "bufferMemorySize": getOrCreateConfiguration((KafkaComponent) component).setBufferMemorySize((java.lang.Integer) value); return true; case "compressionCodec": getOrCreateConfiguration((KafkaComponent) component).setCompressionCodec((java.lang.String) value); return true; @@ -2115,4 +2116,4 @@ public interface KafkaComponentBuilderFactory { } } } -} \ No newline at end of file +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index b1ffc6a..984cd5a 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -16,15 +16,14 @@ */ package org.apache.camel.builder.endpoint.dsl; -import java.util.*; import java.util.Map; import java.util.concurrent.*; -import java.util.function.*; -import java.util.stream.*; + import javax.annotation.Generated; import org.apache.camel.builder.EndpointConsumerBuilder; import org.apache.camel.builder.EndpointProducerBuilder; import org.apache.camel.builder.endpoint.AbstractEndpointBuilder; +import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; /** * Sent and receive messages to/from an Apache Kafka broker. @@ -1858,7 +1857,7 @@ public interface KafkaEndpointBuilderFactory { * box. * * The option is a: - * <code>org.apache.camel.component.kafka.KafkaManualCommitFactory</code> type. + * <code>org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory</code> type. * * Group: consumer (advanced) * @@ -1866,7 +1865,7 @@ public interface KafkaEndpointBuilderFactory { * @return the dsl builder */ default AdvancedKafkaEndpointConsumerBuilder kafkaManualCommitFactory( - org.apache.camel.component.kafka.KafkaManualCommitFactory kafkaManualCommitFactory) { + KafkaManualCommitFactory kafkaManualCommitFactory) { doSetProperty("kafkaManualCommitFactory", kafkaManualCommitFactory); return this; } @@ -1878,7 +1877,7 @@ public interface KafkaEndpointBuilderFactory { * box. * * The option will be converted to a - * <code>org.apache.camel.component.kafka.KafkaManualCommitFactory</code> type. + * <code>org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory</code> type. * * Group: consumer (advanced) * @@ -4728,4 +4727,4 @@ public interface KafkaEndpointBuilderFactory { } return new KafkaEndpointBuilderImpl(path); } -} \ No newline at end of file +}