This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3a74f8fc2d5969965710fa11bf1802c78f5eba4b Author: Claus Ibsen <[email protected]> AuthorDate: Wed Dec 1 09:55:45 2021 +0100 CAMEL-17255: camel-kafka - Use kafka interfaces for kafka producer and kafka consumer so the factory can create other implementations, such as mocks for testing. --- .../camel/component/kafka/DefaultKafkaClientFactory.java | 8 ++++---- .../component/kafka/DefaultKafkaManualAsyncCommit.java | 4 ++-- .../kafka/DefaultKafkaManualAsyncCommitFactory.java | 4 ++-- .../camel/component/kafka/DefaultKafkaManualCommit.java | 8 ++++---- .../component/kafka/DefaultKafkaManualCommitFactory.java | 4 ++-- .../component/kafka/DefaultKafkaManualSyncCommit.java | 4 ++-- .../apache/camel/component/kafka/KafkaClientFactory.java | 14 +++++++------- .../apache/camel/component/kafka/KafkaFetchRecords.java | 2 +- .../camel/component/kafka/KafkaManualCommitFactory.java | 4 ++-- .../org/apache/camel/component/kafka/KafkaProducer.java | 6 +++--- .../consumer/support/KafkaConsumerResumeStrategy.java | 6 ++++-- .../kafka/consumer/support/KafkaRecordProcessor.java | 16 ++++++++-------- .../support/OffsetKafkaConsumerResumeStrategy.java | 7 ++++--- .../consumer/support/PartitionAssignmentListener.java | 6 +++--- .../kafka/consumer/support/ResumeStrategyFactory.java | 4 ++-- .../support/SeekPolicyKafkaConsumerResumeStrategy.java | 5 +++-- .../apache/camel/component/kafka/KafkaProducerTest.java | 4 ++-- .../integration/KafkaConsumerWithResumeStrategyIT.java | 4 ++-- 18 files changed, 57 insertions(+), 53 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java index 425788e..8b06f5b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaClientFactory.java @@ -19,18 +19,18 @@ package org.apache.camel.component.kafka; import java.util.Properties; import org.apache.camel.util.ObjectHelper; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; public class DefaultKafkaClientFactory implements KafkaClientFactory { @Override - public KafkaProducer getProducer(Properties kafkaProps) { + public Producer getProducer(Properties kafkaProps) { return new org.apache.kafka.clients.producer.KafkaProducer(kafkaProps); } @Override - public KafkaConsumer getConsumer(Properties kafkaProps) { + public Consumer getConsumer(Properties kafkaProps) { return new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java index 4db0dfb..e9765fe 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.java @@ -21,7 +21,7 @@ import java.util.Collections; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -33,7 +33,7 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl private final Collection<KafkaAsyncManualCommit> asyncCommits; - public DefaultKafkaManualAsyncCommit(KafkaConsumer consumer, String topicName, String threadId, + public DefaultKafkaManualAsyncCommit(Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java index 43ff1c2..51dc58d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommitFactory.java @@ -20,14 +20,14 @@ import java.util.Collection; import org.apache.camel.Exchange; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; public class DefaultKafkaManualAsyncCommitFactory implements KafkaManualCommitFactory { @Override public KafkaManualCommit newInstance( - Exchange exchange, KafkaConsumer consumer, String topicName, String threadId, + Exchange exchange, Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) { return new DefaultKafkaManualAsyncCommit( diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java index ea6581d..aee6ed6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommit.java @@ -17,12 +17,12 @@ package org.apache.camel.component.kafka; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { - private final KafkaConsumer consumer; + private final Consumer consumer; private final String topicName; private final String threadId; private final StateRepository<String, String> offsetRepository; @@ -30,7 +30,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { private final long recordOffset; private final long commitTimeout; - public DefaultKafkaManualCommit(KafkaConsumer consumer, String topicName, String threadId, + public DefaultKafkaManualCommit(Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout) { this.consumer = consumer; @@ -55,7 +55,7 @@ public abstract class DefaultKafkaManualCommit implements KafkaManualCommit { return String.valueOf(offset); } - public KafkaConsumer getConsumer() { + public Consumer getConsumer() { return consumer; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java index 4080485..0fefc4e 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualCommitFactory.java @@ -20,14 +20,14 @@ import java.util.Collection; import org.apache.camel.Exchange; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; public class DefaultKafkaManualCommitFactory implements KafkaManualCommitFactory { @Override public KafkaManualCommit newInstance( - Exchange exchange, KafkaConsumer consumer, String topicName, String threadId, + Exchange exchange, Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits) { return new DefaultKafkaManualSyncCommit( diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java index 83b2862..7d5dd86 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaManualSyncCommit.java @@ -21,7 +21,7 @@ import java.util.Collections; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -31,7 +31,7 @@ public class DefaultKafkaManualSyncCommit extends DefaultKafkaManualCommit imple private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualSyncCommit.class); - public DefaultKafkaManualSyncCommit(KafkaConsumer consumer, String topicName, String threadId, + public DefaultKafkaManualSyncCommit(Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout) { super(consumer, topicName, threadId, offsetRepository, partition, recordOffset, commitTimeout); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java index d7de678..8ee33506 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaClientFactory.java @@ -18,29 +18,29 @@ package org.apache.camel.component.kafka; import java.util.Properties; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; /** - * Factory to create a new {@link KafkaConsumer} and {@link KafkaProducer} instances. + * Factory to create a new Kafka {@link Consumer} and Kafka {@link Producer} instances. */ public interface KafkaClientFactory { /** - * Creates a new instance of the {@link KafkaProducer} class. + * Creates a new instance of the Kafka {@link Producer} class. * * @param kafkaProps The producer configs. * @return an instance of Kafka producer. */ - KafkaProducer getProducer(Properties kafkaProps); + Producer getProducer(Properties kafkaProps); /** - * Creates a new instance of the {@link KafkaConsumer} class. + * Creates a new instance of the Kafka {@link Consumer} class. * * @param kafkaProps The consumer configs. * @return an instance of Kafka consumer. */ - KafkaConsumer getConsumer(Properties kafkaProps); + Consumer getConsumer(Properties kafkaProps); /** * URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 61fb686..2a487d1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -49,7 +49,7 @@ class KafkaFetchRecords implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class); private final KafkaConsumer kafkaConsumer; - private org.apache.kafka.clients.consumer.KafkaConsumer consumer; + private org.apache.kafka.clients.consumer.Consumer consumer; private final String topicName; private final Pattern topicPattern; private final String threadId; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java index d0b9302..b0f6144 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaManualCommitFactory.java @@ -20,7 +20,7 @@ import java.util.Collection; import org.apache.camel.Exchange; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; /** @@ -32,7 +32,7 @@ public interface KafkaManualCommitFactory { * Creates a new instance */ KafkaManualCommit newInstance( - Exchange exchange, KafkaConsumer consumer, String topicName, String threadId, + Exchange exchange, Consumer consumer, String topicName, String threadId, StateRepository<String, String> offsetRepository, TopicPartition partition, long recordOffset, long commitTimeout, Collection<KafkaAsyncManualCommit> asyncCommits); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index a89ecdd..97a6e6d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -56,7 +56,7 @@ public class KafkaProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); @SuppressWarnings("rawtypes") - private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; + private org.apache.kafka.clients.producer.Producer kafkaProducer; private final KafkaEndpoint endpoint; private final KafkaConfiguration configuration; private ExecutorService workerPool; @@ -89,7 +89,7 @@ public class KafkaProducer extends DefaultAsyncProducer { } @SuppressWarnings("rawtypes") - public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { + public org.apache.kafka.clients.producer.Producer getKafkaProducer() { return kafkaProducer; } @@ -97,7 +97,7 @@ public class KafkaProducer extends DefaultAsyncProducer { * To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance. */ @SuppressWarnings("rawtypes") - public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) { + public void setKafkaProducer(org.apache.kafka.clients.producer.Producer kafkaProducer) { this.kafkaProducer = kafkaProducer; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java index d85579b..0deba22 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java @@ -18,13 +18,15 @@ package org.apache.camel.component.kafka.consumer.support; import org.apache.camel.ResumeStrategy; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume * processing records. */ -public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsumer<?, ?>> { +public interface KafkaConsumerResumeStrategy extends ResumeStrategy<Consumer<?, ?>> { + /** * Perform the resume operation. This runs in the scope of the Kafka Consumer thread and may run concurrently with * other consumer instances when the component is set up to use more than one of them. As such, implementations are @@ -32,5 +34,5 @@ public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaConsume * * @param consumer an instance of the KafkaConsumer which is resuming the operation */ - void resume(KafkaConsumer<?, ?> consumer); + void resume(Consumer<?, ?> consumer); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 197a865..320cbfe 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -34,8 +34,8 @@ import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.StateRepository; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; @@ -50,7 +50,7 @@ public class KafkaRecordProcessor { private final boolean autoCommitEnabled; private final KafkaConfiguration configuration; private final Processor processor; - private final KafkaConsumer<?, ?> consumer; + private final Consumer<?, ?> consumer; private final KafkaManualCommitFactory manualCommitFactory; private final String threadId; private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits; @@ -80,7 +80,7 @@ public class KafkaRecordProcessor { } public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration configuration, - Processor processor, KafkaConsumer<?, ?> consumer, + Processor processor, Consumer<?, ?> consumer, KafkaManualCommitFactory manualCommitFactory, String threadId, ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) { this.autoCommitEnabled = autoCommitEnabled; @@ -194,7 +194,7 @@ public class KafkaRecordProcessor { } public static void commitOffset( - KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, boolean stopping, boolean forceCommit, String threadId) { if (partitionLastOffset == START_OFFSET) { @@ -221,7 +221,7 @@ public class KafkaRecordProcessor { } private static void commitOffset( - KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset) { long timeout = configuration.getCommitTimeoutMs(); consumer.commitSync( @@ -230,7 +230,7 @@ public class KafkaRecordProcessor { } private static void forceSyncCommit( - KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) { if (LOG.isDebugEnabled()) { LOG.debug("Forcing commitSync {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(), @@ -247,7 +247,7 @@ public class KafkaRecordProcessor { } private static void commitSync( - KafkaConfiguration configuration, KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, + KafkaConfiguration configuration, Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) { if (LOG.isDebugEnabled()) { @@ -258,7 +258,7 @@ public class KafkaRecordProcessor { } private static void commitAsync( - KafkaConsumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) { + Consumer<?, ?> consumer, TopicPartition partition, long partitionLastOffset, String threadId) { if (LOG.isDebugEnabled()) { LOG.debug("Auto commitAsync on stop {} from topic {}", threadId, partition.topic()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java index 7c13eb1..5e690fa 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java @@ -20,7 +20,7 @@ package org.apache.camel.component.kafka.consumer.support; import java.util.Set; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +32,7 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce * A resume strategy that uses Kafka's offset for resuming */ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy { + private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class); private final StateRepository<String, String> offsetRepository; @@ -40,7 +41,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr this.offsetRepository = offsetRepository; } - private void resumeFromOffset(final KafkaConsumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) { + private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) { // The state contains the last read offset, so you need to seek from the next one long offset = deserializeOffsetValue(offsetState) + 1; LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset); @@ -48,7 +49,7 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr } @Override - public void resume(final KafkaConsumer<?, ?> consumer) { + public void resume(final Consumer<?, ?> consumer) { Set<TopicPartition> assignments = consumer.assignment(); for (TopicPartition topicPartition : assignments) { String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index 0420041..1ffbd2f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -22,8 +22,8 @@ import java.util.Map; import java.util.function.Supplier; import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +36,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private final String threadId; private final String topicName; private final KafkaConfiguration configuration; - private final KafkaConsumer consumer; + private final Consumer consumer; private final Map<String, Long> lastProcessedOffset; private final KafkaConsumerResumeStrategy resumeStrategy; private Supplier<Boolean> stopStateSupplier; public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration, - KafkaConsumer consumer, Map<String, Long> lastProcessedOffset, + Consumer consumer, Map<String, Long> lastProcessedOffset, Supplier<Boolean> stopStateSupplier) { this.threadId = threadId; this.topicName = topicName; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java index 272f467..09f304b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java @@ -19,7 +19,7 @@ package org.apache.camel.component.kafka.consumer.support; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ public final class ResumeStrategyFactory { private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy { @SuppressWarnings("unused") @Override - public void resume(KafkaConsumer<?, ?> consumer) { + public void resume(Consumer<?, ?> consumer) { // NO-OP } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java index 969f0df..1eba302 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java @@ -17,7 +17,7 @@ package org.apache.camel.component.kafka.consumer.support; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; * A resume strategy that uses Camel's seekTo configuration for resuming */ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy { + private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class); private final String seekPolicy; @@ -34,7 +35,7 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum } @Override - public void resume(final KafkaConsumer<?, ?> consumer) { + public void resume(final Consumer<?, ?> consumer) { if (seekPolicy.equals("beginning")) { LOG.debug("Seeking from the beginning of topic"); consumer.seekToBeginning(consumer.assignment()); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 3e287ee..9495020 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -131,7 +131,7 @@ public class KafkaProducerTest { public void processSendsMessageWithException() { endpoint.getConfiguration().setTopic("sometopic"); // setup the exception here - org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); + org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer(); Mockito.when(kp.send(any(ProducerRecord.class))).thenThrow(new ApiException()); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getMessage()).thenReturn(in); @@ -165,7 +165,7 @@ public class KafkaProducerTest { Mockito.when(exchange.getMessage()).thenReturn(in); // setup the exception here - org.apache.kafka.clients.producer.KafkaProducer kp = producer.getKafkaProducer(); + org.apache.kafka.clients.producer.Producer kp = producer.getKafkaProducer(); Mockito.when(kp.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new ApiException()); in.setHeader(KafkaConstants.PARTITION_KEY, 4); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java index 192a1a6..15bd79c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java @@ -25,7 +25,7 @@ import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -53,7 +53,7 @@ public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupp } @Override - public void resume(KafkaConsumer<?, ?> consumer) { + public void resume(Consumer<?, ?> consumer) { resumeCalled = true; if (consumer != null) {
