This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kd
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ffc0c9ea2309e913d630cc384a2fd3c7e1a35b48
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Mar 11 20:46:40 2026 +0100

    CAMEL-23023: camel-kafka - Kafka consumers are started eager before 
CamelContext is fully started
---
 .../camel/component/kafka/KafkaComponent.java      | 32 +++++++++++++++++++++-
 .../camel/component/kafka/KafkaConsumer.java       |  8 +++++-
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 347514dcd247..704273acf9ca 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,9 +17,12 @@
 package org.apache.camel.component.kafka;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedStartupListener;
 import org.apache.camel.SSLContextParametersAware;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
@@ -33,9 +36,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Component("kafka")
-public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware {
+public class KafkaComponent extends HealthCheckComponent implements 
SSLContextParametersAware, ExtendedStartupListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaComponent.class);
 
+    private final List<Runnable> pendingConsumers = new 
CopyOnWriteArrayList<>();
+
     @Metadata
     private KafkaConfiguration configuration = new KafkaConfiguration();
     @Metadata(label = "security", defaultValue = "false")
@@ -104,6 +110,10 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         return endpoint;
     }
 
+    void pendingConsumer(Runnable task) {
+        pendingConsumers.add(task);
+    }
+
     public KafkaConfiguration getConfiguration() {
         return configuration;
     }
@@ -244,6 +254,26 @@ public class KafkaComponent extends HealthCheckComponent 
implements SSLContextPa
         this.subscribeConsumerTopicMustExists = 
subscribeConsumerTopicMustExists;
     }
 
+    @Override
+    public void onCamelContextStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        if (alreadyStarted) {
+            startPendingConsumers();
+        }
+    }
+
+    @Override
+    public void onCamelContextFullyStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        startPendingConsumers();
+    }
+
+    private void startPendingConsumers() {
+        if (!pendingConsumers.isEmpty()) {
+            LOG.info("Starting {} pending Kafka consumers as CamelContext is 
fully started", pendingConsumers.size());
+            pendingConsumers.forEach(Runnable::run);
+            pendingConsumers.clear();
+        }
+    }
+
     @Override
     protected void doInit() throws Exception {
         super.doInit();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52cff9dc06b8..98f18d942c60 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -170,7 +170,13 @@ public class KafkaConsumer extends DefaultConsumer
             KafkaFetchRecords task = new KafkaFetchRecords(
                     this, bridge, topic, pattern, Integer.toString(i), 
getProps(), consumerListener);
 
-            executor.submit(task);
+            if (!endpoint.getCamelContext().isStarted()) {
+                // if camel has not been fully started yet then delay starting 
this consumer to avoid
+                // process incoming message before camel is fully started
+                endpoint.getComponent().pendingConsumer(() -> 
executor.submit(task));
+            } else {
+                executor.submit(task);
+            }
 
             tasks.add(task);
         }

Reply via email to