This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kd in repository https://gitbox.apache.org/repos/asf/camel.git
commit ffc0c9ea2309e913d630cc384a2fd3c7e1a35b48 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Mar 11 20:46:40 2026 +0100 CAMEL-23023: camel-kafka - Kafka consumers are started eager before CamelContext is fully started --- .../camel/component/kafka/KafkaComponent.java | 32 +++++++++++++++++++++- .../camel/component/kafka/KafkaConsumer.java | 8 +++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java index 347514dcd247..704273acf9ca 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java @@ -17,9 +17,12 @@ package org.apache.camel.component.kafka; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedStartupListener; import org.apache.camel.SSLContextParametersAware; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory; @@ -33,9 +36,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Component("kafka") -public class KafkaComponent extends HealthCheckComponent implements SSLContextParametersAware { +public class KafkaComponent extends HealthCheckComponent implements SSLContextParametersAware, ExtendedStartupListener { + private static final Logger LOG = LoggerFactory.getLogger(KafkaComponent.class); + private final List<Runnable> pendingConsumers = new CopyOnWriteArrayList<>(); + @Metadata private KafkaConfiguration configuration = new KafkaConfiguration(); @Metadata(label = "security", defaultValue = "false") @@ -104,6 +110,10 @@ public class KafkaComponent extends HealthCheckComponent implements SSLContextPa return endpoint; } + void pendingConsumer(Runnable task) { + pendingConsumers.add(task); + } + public KafkaConfiguration getConfiguration() { return configuration; } @@ -244,6 +254,26 @@ public class KafkaComponent extends HealthCheckComponent implements SSLContextPa this.subscribeConsumerTopicMustExists = subscribeConsumerTopicMustExists; } + @Override + public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + if (alreadyStarted) { + startPendingConsumers(); + } + } + + @Override + public void onCamelContextFullyStarted(CamelContext context, boolean alreadyStarted) throws Exception { + startPendingConsumers(); + } + + private void startPendingConsumers() { + if (!pendingConsumers.isEmpty()) { + LOG.info("Starting {} pending Kafka consumers as CamelContext is fully started", pendingConsumers.size()); + pendingConsumers.forEach(Runnable::run); + pendingConsumers.clear(); + } + } + @Override protected void doInit() throws Exception { super.doInit(); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 52cff9dc06b8..98f18d942c60 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -170,7 +170,13 @@ public class KafkaConsumer extends DefaultConsumer KafkaFetchRecords task = new KafkaFetchRecords( this, bridge, topic, pattern, Integer.toString(i), getProps(), consumerListener); - executor.submit(task); + if (!endpoint.getCamelContext().isStarted()) { + // if camel has not been fully started yet then delay starting this consumer to avoid + // process incoming message before camel is fully started + endpoint.getComponent().pendingConsumer(() -> executor.submit(task)); + } else { + executor.submit(task); + } tasks.add(task); }
