This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0992cbc2b28bb6bc385a7a4fe3d7eb16c6558fcc Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Feb 4 09:18:48 2022 +0100 CAMEL-15562: cleanup the creation of the strategy factories --- .../apache/camel/component/kafka/KafkaFetchRecords.java | 10 ++++++---- .../consumer/support/PartitionAssignmentListener.java | 14 +++----------- .../kafka/consumer/support/ResumeStrategyFactory.java | 15 ++++++++++++++- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index deeef3c..91728e3 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -32,6 +32,7 @@ import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStra import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade; import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; import org.apache.camel.component.kafka.consumer.support.ProcessingResult; +import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.util.IOHelper; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -137,12 +138,13 @@ class KafkaFetchRecords implements Runnable { } private void subscribe() { - KafkaConsumerResumeStrategy userProvidedStrategy - = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class); + + KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer); + resumeStrategy.setConsumer(consumer); PartitionAssignmentListener listener = new PartitionAssignmentListener( - threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, - this::isRunnable, commitManager, userProvidedStrategy); + threadId, kafkaConsumer.getEndpoint().getConfiguration(), lastProcessedOffset, + this::isRunnable, commitManager, resumeStrategy); if (LOG.isInfoEnabled()) { LOG.info("Subscribing {} to {}", threadId, getPrintableTopic()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index fe19147..482f804 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.consumer.CommitManager; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -41,10 +40,10 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private final Map<String, Long> lastProcessedOffset; private final KafkaConsumerResumeStrategy resumeStrategy; private final CommitManager commitManager; - private Supplier<Boolean> stopStateSupplier; + private final Supplier<Boolean> stopStateSupplier; public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, - Consumer consumer, Map<String, Long> lastProcessedOffset, + Map<String, Long> lastProcessedOffset, Supplier<Boolean> stopStateSupplier, CommitManager commitManager, KafkaConsumerResumeStrategy resumeStrategy) { this.threadId = threadId; @@ -52,15 +51,8 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { this.lastProcessedOffset = lastProcessedOffset; this.commitManager = commitManager; this.stopStateSupplier = stopStateSupplier; + this.resumeStrategy = resumeStrategy; - if (resumeStrategy == null) { - LOG.info("No resume strategy was provided ... checking for builtins ..."); - this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration); - } else { - LOG.info("Using user-provided strategy"); - this.resumeStrategy = resumeStrategy; - } - resumeStrategy.setConsumer(consumer); } @Override diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java index 67707de..c2648ce 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kafka.consumer.support; import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaConsumer; import org.apache.camel.spi.StateRepository; import org.apache.kafka.clients.consumer.Consumer; import org.slf4j.Logger; @@ -53,8 +54,19 @@ public final class ResumeStrategyFactory { private ResumeStrategyFactory() { } - public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConfiguration configuration) { + public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) { + // When using resumable routes, which register the strategy via service, it takes priority over everything else + KafkaConsumerResumeStrategy resumableRouteStrategy + = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class); + + if (resumableRouteStrategy != null) { + return resumableRouteStrategy; + } + + KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration(); + if (configuration.getResumeStrategy() != null) { + LOG.info("Using user-provided strategy"); return configuration.getResumeStrategy(); } @@ -62,6 +74,7 @@ public final class ResumeStrategyFactory { } private static KafkaConsumerResumeStrategy builtinResumeStrategies(KafkaConfiguration configuration) { + LOG.debug("No resume strategy was provided ... checking for built-ins ..."); StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); String seekTo = configuration.getSeekTo();
