This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 26086db62a3667f19cfbe34f0acdec25790bcc90
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Thu Jun 16 17:02:32 2022 +0200

    CAMEL-18128: delay the initialization of the producer/consumer
    
    This should allow certain parameters of the the client classes to be
    adjusted according to the adapter
---
 .../resume/kafka/SingleNodeKafkaResumeStrategy.java      | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 206293826f7..71d8ad26781 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -178,6 +178,8 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
 
     @Override
     public void updateLastOffset(T offset) throws Exception {
+        createProducer();
+
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
@@ -197,6 +199,8 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
+        createConsumer();
+
         subscribe();
 
         LOG.debug("Loading records from topic {}", topic);
@@ -391,17 +395,21 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
 
     @Override
     public void init() {
-
         LOG.debug("Initializing the Kafka resume strategy");
-        if (consumer == null) {
-            consumer = new KafkaConsumer<>(consumerConfig);
-        }
+    }
 
+    private void createProducer() {
         if (producer == null) {
             producer = new KafkaProducer<>(producerConfig);
         }
     }
 
+    private void createConsumer() {
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(consumerConfig);
+        }
+    }
+
     @Override
     public void stop() {
         LOG.info("Closing the Kafka producer");

Reply via email to