This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.18.x by this push:
     new 34dfde71b507 CAMEL-23023: camel-kafka - Kafka consumers are started 
eager before C… (#21943)
34dfde71b507 is described below

commit 34dfde71b50757f2314da545c968342eeb4c3035
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 12 10:37:09 2026 +0100

    CAMEL-23023: camel-kafka - Kafka consumers are started eager before C… 
(#21943)
    
    * CAMEL-23023: camel-kafka - Kafka consumers are started eager before 
CamelContext is fully started
---
 .../camel/component/kafka/KafkaComponent.java      | 39 +++++++++++++++++++++-
 .../camel/component/kafka/KafkaConsumer.java       |  8 ++++-
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 347514dcd247..9a8413d368f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,9 +17,12 @@
 package org.apache.camel.component.kafka;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedStartupListener;
 import org.apache.camel.SSLContextParametersAware;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
@@ -33,9 +36,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Component("kafka")
-public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware {
+public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware, ExtendedStartupListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaComponent.class);
 
+    private final List<Runnable> pendingConsumers = new 
CopyOnWriteArrayList<>();
+
     @Metadata
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @Metadata(label = "security", defaultValue = "false")
@@ -104,6 +110,10 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         return endpoint;
     }
 
+    void pendingConsumer(Runnable task) {
+        pendingConsumers.add(task);
+    }
+
     public KafkaConfiguration getConfiguration() {
         return configuration;
     }
@@ -244,6 +254,26 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         this.subscribeConsumerTopicMustExists = 
subscribeConsumerTopicMustExists;
     }
 
+    @Override
+    public void onCamelContextStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        if (alreadyStarted) {
+            startPendingConsumers();
+        }
+    }
+
+    @Override
+    public void onCamelContextFullyStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        startPendingConsumers();
+    }
+
+    private void startPendingConsumers() {
+        if (!pendingConsumers.isEmpty()) {
+            LOG.info("Starting {} pending Kafka consumers as CamelContext is 
fully started", pendingConsumers.size());
+            pendingConsumers.forEach(Runnable::run);
+            pendingConsumers.clear();
+        }
+    }
+
     @Override
     protected void doInit() throws Exception {
         super.doInit();
@@ -266,4 +296,11 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         PropertyBindingSupport.bindProperties(getCamelContext(), map, 
configuration.getAdditionalProperties());
         configuration.setAdditionalProperties(map);
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+        pendingConsumers.clear();
+    }
+
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52cff9dc06b8..98f18d942c60 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -170,7 +170,13 @@ public class KafkaConsumer extends DefaultConsumer
             KafkaFetchRecords task = new KafkaFetchRecords(
                     this, bridge, topic, pattern, Integer.toString(i), 
getProps(), consumerListener);
 
-            executor.submit(task);
+            if (!endpoint.getCamelContext().isStarted()) {
+                // if camel has not been fully started yet then delay starting 
this consumer to avoid
+                // process incoming message before camel is fully started
+                endpoint.getComponent().pendingConsumer(() -> 
executor.submit(task));
+            } else {
+                executor.submit(task);
+            }
 
             tasks.add(task);
         }

Reply via email to