This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 26086db62a3667f19cfbe34f0acdec25790bcc90 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Thu Jun 16 17:02:32 2022 +0200 CAMEL-18128: delay the initialization of the producer/consumer This should allow certain parameters of the the client classes to be adjusted according to the adapter --- .../resume/kafka/SingleNodeKafkaResumeStrategy.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 206293826f7..71d8ad26781 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -178,6 +178,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka @Override public void updateLastOffset(T offset) throws Exception { + createProducer(); + OffsetKey<?> key = offset.getOffsetKey(); Offset<?> offsetValue = offset.getLastOffset(); @@ -197,6 +199,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * @throws Exception */ public void loadCache() throws Exception { + createConsumer(); + subscribe(); LOG.debug("Loading records from topic {}", topic); @@ -391,17 +395,21 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka @Override public void init() { - LOG.debug("Initializing the Kafka resume strategy"); - if (consumer == null) { - consumer = new KafkaConsumer<>(consumerConfig); - } + } + private void createProducer() { if (producer == null) { producer = new KafkaProducer<>(producerConfig); } } + private void createConsumer() { + if (consumer == null) { + consumer = new KafkaConsumer<>(consumerConfig); + } + } + @Override public void stop() { LOG.info("Closing the Kafka producer");