This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f3220a13cd61 CAMEL-23023: camel-kafka - Kafka consumers are started
eager before C… (#21943)
f3220a13cd61 is described below
commit f3220a13cd61a9e8aa028e103052a522a6e5d632
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 12 10:37:09 2026 +0100
CAMEL-23023: camel-kafka - Kafka consumers are started eager before C…
(#21943)
* CAMEL-23023: camel-kafka - Kafka consumers are started eager before
CamelContext is fully started
---
.../camel/component/kafka/KafkaComponent.java | 39 +++++++++++++++++++++-
.../camel/component/kafka/KafkaConsumer.java | 8 ++++-
.../ROOT/pages/camel-4x-upgrade-guide-4_19.adoc | 3 ++
3 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 347514dcd247..9a8413d368f4 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,9 +17,12 @@
package org.apache.camel.component.kafka;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedStartupListener;
import org.apache.camel.SSLContextParametersAware;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
@@ -33,9 +36,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component("kafka")
-public class KafkaComponent extends HealthCheckComponent implements
SSLContextParametersAware {
+public class KafkaComponent extends HealthCheckComponent implements
SSLContextParametersAware, ExtendedStartupListener {
+
private static final Logger LOG =
LoggerFactory.getLogger(KafkaComponent.class);
+ private final List<Runnable> pendingConsumers = new
CopyOnWriteArrayList<>();
+
@Metadata
private KafkaConfiguration configuration = new KafkaConfiguration();
@Metadata(label = "security", defaultValue = "false")
@@ -104,6 +110,10 @@ public class KafkaComponent extends HealthCheckComponent
implements SSLContextPa
return endpoint;
}
+ void pendingConsumer(Runnable task) {
+ pendingConsumers.add(task);
+ }
+
public KafkaConfiguration getConfiguration() {
return configuration;
}
@@ -244,6 +254,26 @@ public class KafkaComponent extends HealthCheckComponent
implements SSLContextPa
this.subscribeConsumerTopicMustExists =
subscribeConsumerTopicMustExists;
}
+ @Override
+ public void onCamelContextStarted(CamelContext context, boolean
alreadyStarted) throws Exception {
+ if (alreadyStarted) {
+ startPendingConsumers();
+ }
+ }
+
+ @Override
+ public void onCamelContextFullyStarted(CamelContext context, boolean
alreadyStarted) throws Exception {
+ startPendingConsumers();
+ }
+
+ private void startPendingConsumers() {
+ if (!pendingConsumers.isEmpty()) {
+ LOG.info("Starting {} pending Kafka consumers as CamelContext is
fully started", pendingConsumers.size());
+ pendingConsumers.forEach(Runnable::run);
+ pendingConsumers.clear();
+ }
+ }
+
@Override
protected void doInit() throws Exception {
super.doInit();
@@ -266,4 +296,11 @@ public class KafkaComponent extends HealthCheckComponent
implements SSLContextPa
PropertyBindingSupport.bindProperties(getCamelContext(), map,
configuration.getAdditionalProperties());
configuration.setAdditionalProperties(map);
}
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ pendingConsumers.clear();
+ }
+
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52cff9dc06b8..98f18d942c60 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -170,7 +170,13 @@ public class KafkaConsumer extends DefaultConsumer
KafkaFetchRecords task = new KafkaFetchRecords(
this, bridge, topic, pattern, Integer.toString(i),
getProps(), consumerListener);
- executor.submit(task);
+ if (!endpoint.getCamelContext().isStarted()) {
+ // if camel has not been fully started yet then delay starting
this consumer to avoid
+ // process incoming message before camel is fully started
+ endpoint.getComponent().pendingConsumer(() ->
executor.submit(task));
+ } else {
+ executor.submit(task);
+ }
tasks.add(task);
}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
index 9dce4c3d960f..4b4a6346bf66 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_19.adoc
@@ -286,6 +286,9 @@ The Kafka client library has been upgraded from 3.9.1 to
4.2.0. This is a major
* If you had explicitly configured the `partitioner` option to use
`org.apache.kafka.clients.producer.internals.DefaultPartitioner` or
`org.apache.kafka.clients.producer.UniformStickyPartitioner`, you must remove
that configuration as these classes have been removed in Kafka 4.0. The
built-in default partitioner (used when no partitioner is set) continues to
work.
+Camel will now defer starting Kafka consumers during startup of `CamelContext`
to after the context is fully started, to ensure
+that any incoming message from Kafka brokers are only received by Camel when
everything is fully started.
+
=== camel-google-pubsub-lite
The `camel-google-pubsub-lite` component has been removed. The component was
deprecated in Camel 4.10 following Google Cloud Platform's deprecation of the
underlying Pub/Sub Lite service.