This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new de1c8f38a69 CAMEL-22295: camel-core - Consumer can setup MDC with
route id during… (#18773)
de1c8f38a69 is described below
commit de1c8f38a69ea436ced7242356b82d4383151872
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jul 30 20:02:10 2025 +0200
CAMEL-22295: camel-core - Consumer can setup MDC with route id during…
(#18773)
* CAMEL-22295: camel-core - MDC for Camel based threads used by sources
that are sticky to a given route
---
.../aws2/kinesis/KclKinesis2Consumer.java | 2 +-
.../component/aws2/kinesis/Kinesis2Endpoint.java | 4 +-
.../camel/component/debezium/DebeziumConsumer.java | 2 +-
.../camel/component/debezium/DebeziumEndpoint.java | 4 +-
.../pubsublite/GooglePubsubLiteConsumer.java | 2 +-
.../pubsublite/GooglePubsubLiteEndpoint.java | 4 +-
.../google/pubsub/GooglePubsubConsumer.java | 2 +-
.../google/pubsub/GooglePubsubEndpoint.java | 4 +-
.../hazelcast/queue/HazelcastQueueConsumer.java | 2 +-
.../hazelcast/queue/HazelcastQueueEndpoint.java | 4 +-
.../smn/SimpleNotificationEndpoint.java | 6 --
.../apache/camel/component/jms/JmsProducer.java | 4 +-
.../camel/component/kafka/KafkaConsumer.java | 2 +-
.../camel/component/kafka/KafkaEndpoint.java | 8 +--
.../camel/component/kafka/KafkaProducer.java | 2 +-
.../kubernetes/AbstractKubernetesEndpoint.java | 4 +-
.../config_maps/KubernetesConfigMapsConsumer.java | 2 +-
.../KubernetesCustomResourcesConsumer.java | 2 +-
.../deployments/KubernetesDeploymentsConsumer.java | 2 +-
.../events/KubernetesEventsConsumer.java | 2 +-
.../kubernetes/hpa/KubernetesHPAConsumer.java | 2 +-
.../namespaces/KubernetesNamespacesConsumer.java | 2 +-
.../kubernetes/nodes/KubernetesNodesConsumer.java | 2 +-
.../kubernetes/pods/KubernetesPodsConsumer.java | 2 +-
.../KubernetesReplicationControllersConsumer.java | 2 +-
.../services/KubernetesServicesConsumer.java | 2 +-
.../OpenshiftDeploymentConfigsConsumer.java | 2 +-
.../apache/camel/component/nats/NatsConsumer.java | 2 +-
.../apache/camel/component/nats/NatsEndpoint.java | 4 +-
...TelemetryInstrumentedThreadFactoryListener.java | 2 +-
.../camel/component/pgevent/PgEventConsumer.java | 2 +-
.../camel/component/pgevent/PgEventEndpoint.java | 4 +-
.../camel/component/rocketmq/RocketMQProducer.java | 2 +-
.../apache/camel/component/sjms/SjmsProducer.java | 4 +-
.../apache/camel/spi/ExecutorServiceManager.java | 3 +-
.../impl/engine/BaseExecutorServiceManager.java | 19 +++++--
.../impl/engine/MDCThreadFactoryListener.java | 63 +++++++++++++++++++++
.../impl/DefaultExecutorServiceManagerTest.java | 4 +-
.../camel/processor/MDCRouteIdAwareTest.java | 66 ++++++++++++++++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_14.adoc | 3 +
40 files changed, 195 insertions(+), 61 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
index 3921456c2ff..8e1a25bda35 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.java
@@ -138,7 +138,7 @@ public class KclKinesis2Consumer extends DefaultConsumer {
} else {
cloudWatchAsyncClient =
getEndpoint().getConfiguration().getCloudWatchAsyncClient();
}
- this.executor = this.getEndpoint().createExecutor();
+ this.executor = this.getEndpoint().createExecutor(this);
this.executor.submit(new KclKinesisConsumingTask(
configuration.getStreamName(),
configuration.getApplicationName(), kinesisAsyncClient, dynamoDbAsyncClient,
cloudWatchAsyncClient,
configuration.isKclDisableCloudwatchMetricsExport()));
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index f7a9e1aa7a3..ca493afb553 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -157,8 +157,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint
implements EndpointS
return null;
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"KinesisStream[" + configuration.getStreamName() + "]", 1);
}
}
diff --git
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
index 48e198ee8de..8a89c7b94f3 100644
---
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
+++
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
@@ -50,7 +50,7 @@ public class DebeziumConsumer extends DefaultConsumer {
super.doStart();
// start a single threaded pool to monitor events
- executorService = endpoint.createExecutor();
+ executorService = endpoint.createExecutor(this);
// create engine
dbzEngine = createDbzEngine();
diff --git
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
index 9ca582f2ab2..46310445755 100644
---
a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
+++
b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -59,8 +59,8 @@ public abstract class DebeziumEndpoint<C extends
EmbeddedDebeziumConfiguration>
return consumer;
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(source,
"DebeziumConsumer");
}
diff --git
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
index 128def8432d..95afd73a0c7 100644
---
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
+++
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
@@ -58,7 +58,7 @@ public class GooglePubsubLiteConsumer extends DefaultConsumer
{
protected void doStart() throws Exception {
super.doStart();
localLog.info("Starting Google PubSub Lite consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- executor = endpoint.createExecutor();
+ executor = endpoint.createExecutor(this);
for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
executor.submit(new SubscriberWrapper());
}
diff --git
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
index d6d092f4d48..45ba9421723 100644
---
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
+++
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java
@@ -132,8 +132,8 @@ public class GooglePubsubLiteEndpoint extends
DefaultEndpoint implements Endpoin
return consumer;
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"GooglePubsubLiteConsumer[" + getDestinationName() + "]",
concurrentConsumers);
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 235bd536cc5..e83bebe362e 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -78,7 +78,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
super.doStart();
localLog.info("Starting Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- executor = endpoint.createExecutor();
+ executor = endpoint.createExecutor(this);
for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
executor.submit(new SubscriberWrapper());
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index cbae380335f..62a5d3ae0b3 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -142,8 +142,8 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
implements EndpointSer
return consumer;
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"GooglePubsubConsumer[" + getDestinationName() + "]",
concurrentConsumers);
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 856767a6712..5d98899c6fa 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -44,7 +44,7 @@ public class HazelcastQueueConsumer extends
HazelcastDefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = ((HazelcastQueueEndpoint) getEndpoint()).createExecutor();
+ executor = ((HazelcastQueueEndpoint)
getEndpoint()).createExecutor(this);
CamelItemListener camelItemListener = new CamelItemListener(this,
cacheName);
queueConsumerTask = new QueueConsumerTask(camelItemListener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
index ee7e27354ad..51794cd0c81 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueEndpoint.java
@@ -69,8 +69,8 @@ public class HazelcastQueueEndpoint extends
HazelcastDefaultEndpoint {
return new HazelcastQueueProducer(hazelcastInstance, this, cacheName);
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
"QueueConsumer",
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"QueueConsumer",
configuration.getPoolSize());
}
diff --git
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
index 8140a65ae2d..e88919cd34f 100644
---
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
+++
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationEndpoint.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.huaweicloud.smn;
-import java.util.concurrent.ExecutorService;
-
import com.huaweicloud.sdk.smn.v2.SmnClient;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
@@ -246,8 +244,4 @@ public class SimpleNotificationEndpoint extends
DefaultEndpoint {
this.smnClient = smnClient;
}
- public ExecutorService createExecutor() {
- // TODO: Delete me when you implemented your custom component
- return
getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
"SimpleNotificationConsumer");
- }
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index 3eb0ce93c62..0ce743c0138 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -577,7 +577,7 @@ public class JmsProducer extends DefaultAsyncProducer {
String name = "JmsReplyManagerTimeoutChecker[" +
getEndpoint().getEndpointConfiguredDestinationName() + "]";
ScheduledExecutorService replyManagerScheduledExecutorService
- =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
name);
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
name = "JmsReplyManagerOnTimeout[" +
getEndpoint().getEndpointConfiguredDestinationName() + "]";
@@ -610,7 +610,7 @@ public class JmsProducer extends DefaultAsyncProducer {
String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
ScheduledExecutorService replyManagerScheduledExecutorService
- =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
name);
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1d496d34a29..52cff9dc06b 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -148,7 +148,7 @@ public class KafkaConsumer extends DefaultConsumer
}
}
- executor = endpoint.createExecutor();
+ executor = endpoint.createExecutor(this);
String topic = endpoint.getConfiguration().getTopic();
Pattern pattern = null;
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 61f341ad7b3..41549edbba4 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -201,15 +201,15 @@ public class KafkaEndpoint extends DefaultEndpoint
implements MultipleConsumersS
}
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"KafkaConsumer[" + configuration.getTopic() + "]",
configuration.getConsumersCount());
}
- public ExecutorService createProducerExecutor() {
+ public ExecutorService createProducerExecutor(Object source) {
int core = getConfiguration().getWorkerPoolCoreSize();
int max = getConfiguration().getWorkerPoolMaxSize();
- return
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+ return
getCamelContext().getExecutorServiceManager().newThreadPool(source,
"KafkaProducer[" + configuration.getTopic() + "]", core, max);
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index c8af6d224a9..99b6c856865 100755
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -179,7 +179,7 @@ public class KafkaProducer extends DefaultAsyncProducer
implements RouteIdAware
workerPool = configuration.getWorkerPool();
shutdownWorkerPool = false;
} else {
- workerPool = endpoint.createProducerExecutor();
+ workerPool = endpoint.createProducerExecutor(this);
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
index 406f35ce5b2..8082a526563 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
@@ -75,8 +75,8 @@ public abstract class AbstractKubernetesEndpoint extends
DefaultEndpoint impleme
}
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
"KubernetesConsumer",
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"KubernetesConsumer",
configuration.getPoolSize());
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
index b2288699fd3..16172967e9e 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesConfigMapsConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
configMapWatcher = new ConfigMapsConsumerTask();
executor.submit(configMapWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
index 55699a7ec0d..b8cd2e3306b 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/customresources/KubernetesCustomResourcesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesCustomResourcesConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
customResourcesWatcher = new CustomResourcesConsumerTask();
executor.submit(customResourcesWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
index 3a2a7858229..10941a4beea 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/deployments/KubernetesDeploymentsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesDeploymentsConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
deploymentsWatcher = new DeploymentsConsumerTask();
executor.submit(deploymentsWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
index ed2ddb8cf3b..b9a3f329318 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/events/KubernetesEventsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesEventsConsumer extends DefaultConsumer
{
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
eventWatcher = new EventsConsumerTask();
executor.submit(eventWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
index c351853b615..927474cc68d 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/hpa/KubernetesHPAConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesHPAConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
hpasWatcher = new HpaConsumerTask();
executor.submit(hpasWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
index 67b7844f5fa..bc920a21b8d 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/namespaces/KubernetesNamespacesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesNamespacesConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
nsWatcher = new NamespacesConsumerTask();
executor.submit(nsWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
index 026f8033eb9..35e07c5b70e 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/nodes/KubernetesNodesConsumer.java
@@ -56,7 +56,7 @@ public class KubernetesNodesConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
nodesWatcher = new NodesConsumerTask();
executor.submit(nodesWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
index 17909786da6..addcc543d53 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/pods/KubernetesPodsConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesPodsConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
podsWatcher = new PodsConsumerTask();
executor.submit(podsWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
index fe3c22afc8e..9785ecaf494 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/replication_controllers/KubernetesReplicationControllersConsumer.java
@@ -58,7 +58,7 @@ public class KubernetesReplicationControllersConsumer extends
DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
rcWatcher = new ReplicationControllersConsumerTask();
executor.submit(rcWatcher);
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
index 637e08b997f..b92f165053c 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/services/KubernetesServicesConsumer.java
@@ -57,7 +57,7 @@ public class KubernetesServicesConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
servicesWatcher = new ServicesConsumerTask();
executor.submit(servicesWatcher);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
index 685fb7e11a0..b66cb579d0c 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/openshift/deploymentconfigs/OpenshiftDeploymentConfigsConsumer.java
@@ -58,7 +58,7 @@ public class OpenshiftDeploymentConfigsConsumer extends
DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- executor = getEndpoint().createExecutor();
+ executor = getEndpoint().createExecutor(this);
deploymentsWatcher = new DeploymentsConfigConsumerTask();
executor.submit(deploymentsWatcher);
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 42b5c988c5c..13c4bf5f4d2 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -64,7 +64,7 @@ public class NatsConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
LOG.debug("Starting Nats Consumer");
- this.executor = this.getEndpoint().createExecutor();
+ this.executor = this.getEndpoint().createExecutor(this);
LOG.debug("Getting Nats Connection");
this.connection =
this.getEndpoint().getConfiguration().getConnection() != null
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
index 1ffa295af1b..ed1aaa74511 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
@@ -83,8 +83,8 @@ public class NatsEndpoint extends DefaultEndpoint
return "nats";
}
- public ExecutorService createExecutor() {
- return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
+ public ExecutorService createExecutor(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newFixedThreadPool(source,
"NatsTopic[" + configuration.getTopic() + "]",
configuration.getPoolSize());
}
diff --git
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
index ef5bb9459cd..1ccde52c75c 100644
---
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
+++
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryInstrumentedThreadFactoryListener.java
@@ -26,7 +26,7 @@ import org.apache.camel.spi.annotations.JdkService;
public class OpenTelemetryInstrumentedThreadFactoryListener implements
ExecutorServiceManager.ThreadFactoryListener {
@Override
- public ThreadFactory onNewThreadFactory(ThreadFactory factory) {
+ public ThreadFactory onNewThreadFactory(Object source, ThreadFactory
factory) {
return runnable -> factory.newThread(Context.current().wrap(runnable));
}
}
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index a751c2a38ca..83c7f585630 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -63,7 +63,7 @@ public class PgEventConsumer extends DefaultConsumer {
if (endpoint.getWorkerPool() != null) {
workerPool = endpoint.getWorkerPool();
} else {
- workerPool = endpoint.createWorkerPool();
+ workerPool = endpoint.createWorkerPool(this);
shutdownWorkerPool = true;
}
// used for re-connecting to the database
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
index 8e56d15a6b8..4cf0a8dec8d 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
@@ -195,8 +195,8 @@ public class PgEventEndpoint extends DefaultEndpoint
implements EndpointServiceL
return consumer;
}
- ExecutorService createWorkerPool() {
- return
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+ ExecutorService createWorkerPool(Object source) {
+ return
getCamelContext().getExecutorServiceManager().newThreadPool(source,
"PgEventConsumer[" + channel + "]", workerPoolCoreSize,
workerPoolMaxSize);
}
diff --git
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
index 0fd0f529c69..e6a181b2cc7 100644
---
a/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
+++
b/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java
@@ -189,7 +189,7 @@ public class RocketMQProducer extends DefaultAsyncProducer {
replyManager.setEndpoint(getEndpoint());
String name = "RocketMQReplyManagerTimeoutChecker[" +
getEndpoint().getTopicName() + "]";
ScheduledExecutorService scheduledExecutorService
- =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
name);
replyManager.setScheduledExecutorService(scheduledExecutorService);
LOG.debug("Starting ReplyManager: {}", name);
ServiceHelper.startService(replyManager);
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 6ff8154ea8a..6e38956785b 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -166,7 +166,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
ScheduledExecutorService replyManagerScheduledExecutorService
- =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
name);
replyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
@@ -186,7 +186,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
String name = "JmsReplyManagerTimeoutChecker[" +
getEndpoint().getEndpointConfiguredDestinationName() + "]";
ScheduledExecutorService replyManagerScheduledExecutorService
- =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
name);
+ =
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
name);
temporaryQueueReplyManager.setScheduledExecutorService(replyManagerScheduledExecutorService);
name = "JmsReplyManagerOnTimeout[" +
getEndpoint().getEndpointConfiguredDestinationName() + "]";
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
index f2e2b1a1c83..34bdcc3826a 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/ExecutorServiceManager.java
@@ -68,10 +68,11 @@ public interface ExecutorServiceManager extends
ShutdownableService, StaticServi
* Listener when Camel has created a new {@link ThreadFactory} to be
used by this
* {@link ExecutorServiceManager}.
*
+ * @param source optional source where the thread is being used
(such as a {@link org.apache.camel.Consumer}.
* @param factory the created factory
* @return the factory to use by this {@link
ExecutorServiceManager}.
*/
- ThreadFactory onNewThreadFactory(ThreadFactory factory);
+ ThreadFactory onNewThreadFactory(Object source, ThreadFactory factory);
}
/**
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
index ee953195cd3..92a19b4e36a 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java
@@ -167,7 +167,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
@Override
public Thread newThread(String name, Runnable runnable) {
- ThreadFactory factory = createThreadFactory(name, true);
+ ThreadFactory factory = createThreadFactory(null, name, true);
return factory.newThread(runnable);
}
@@ -200,7 +200,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
profile.addDefaults(defaultProfile);
- ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
+ ThreadFactory threadFactory = createThreadFactory(source,
sanitizedName, true);
ExecutorService executorService =
threadPoolFactory.newThreadPool(profile, threadFactory);
onThreadPoolCreated(executorService, source, profile.getId());
if (LOG.isDebugEnabled()) {
@@ -226,7 +226,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
@Override
public ExecutorService newCachedThreadPool(Object source, String name) {
String sanitizedName = URISupport.sanitizeUri(name);
- ExecutorService answer =
threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
+ ExecutorService answer =
threadPoolFactory.newCachedThreadPool(createThreadFactory(source,
sanitizedName, true));
onThreadPoolCreated(answer, source, null);
if (LOG.isDebugEnabled()) {
@@ -260,7 +260,7 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
String sanitizedName = URISupport.sanitizeUri(name);
profile.addDefaults(getDefaultThreadPoolProfile());
ScheduledExecutorService answer
- = threadPoolFactory.newScheduledThreadPool(profile,
createThreadFactory(sanitizedName, true));
+ = threadPoolFactory.newScheduledThreadPool(profile,
createThreadFactory(source, sanitizedName, true));
onThreadPoolCreated(answer, source, null);
if (LOG.isDebugEnabled()) {
@@ -474,6 +474,13 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
if (!threadFactoryListeners.isEmpty()) {
threadFactoryListeners.sort(OrderedComparator.get());
}
+
+ // enrich threads for MDC logging
+ boolean usedMDCLogging = getCamelContext().isUseMDCLogging() != null
&& getCamelContext().isUseMDCLogging();
+ if (usedMDCLogging) {
+ threadFactoryListeners.add(new MDCThreadFactoryListener());
+ }
+
ServiceHelper.startService(threadPoolFactory);
}
@@ -591,10 +598,10 @@ public class BaseExecutorServiceManager extends
ServiceSupport implements Execut
onNewExecutorService(executorService);
}
- protected ThreadFactory createThreadFactory(String name, boolean daemon) {
+ protected ThreadFactory createThreadFactory(Object source, String name,
boolean daemon) {
ThreadFactory factory = new CamelThreadFactory(threadNamePattern,
name, daemon);
for (ThreadFactoryListener listener : threadFactoryListeners) {
- factory = listener.onNewThreadFactory(factory);
+ factory = listener.onNewThreadFactory(source, factory);
}
return factory;
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
new file mode 100644
index 00000000000..b4a44d54fa4
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCThreadFactoryListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.spi.ExecutorServiceManager;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.UnitOfWork;
+import org.slf4j.MDC;
+
+/**
+ * MDC {@link
org.apache.camel.spi.ExecutorServiceManager.ThreadFactoryListener} which will
include the MDC information
+ * for route id which allows MDC logging to pin-point to the route that logs.
This makes it possible to include this
+ * information earlier such as from the internal work that a consumer performs
before routing
+ * {@link org.apache.camel.Exchange} where the {@link MDCUnitOfWork} would
include this information.
+ */
+public class MDCThreadFactoryListener implements
ExecutorServiceManager.ThreadFactoryListener {
+
+ @Override
+ public ThreadFactory onNewThreadFactory(Object source, ThreadFactory
factory) {
+ if (source instanceof Consumer c && c instanceof RouteIdAware ra) {
+ String name = c.getEndpoint().getCamelContext().getName();
+ String routeId = ra.getRouteId();
+ if (routeId != null) {
+ return newThreadFactory(name, routeId, factory);
+ }
+ }
+ return factory;
+ }
+
+ private ThreadFactory newThreadFactory(String contextName, String routeId,
ThreadFactory tf) {
+ return task -> {
+ Runnable wrapped = () -> {
+ MDC.put(UnitOfWork.MDC_CAMEL_CONTEXT_ID, contextName);
+ MDC.put(UnitOfWork.MDC_ROUTE_ID, routeId);
+ try {
+ task.run();
+ } finally {
+ MDC.remove(UnitOfWork.MDC_CAMEL_CONTEXT_ID);
+ MDC.remove(UnitOfWork.MDC_ROUTE_ID);
+ }
+ };
+ return tf.newThread(wrapped);
+ };
+
+ }
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 0cf702024d1..91ca8e0ee9e 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -572,7 +572,7 @@ public class DefaultExecutorServiceManagerTest extends
ContextTestSupport {
// custom thread factory
ThreadFactory myFactory = r -> new Thread(r, "MyFactory");
// hook custom factory into Camel
- context.getExecutorServiceManager().addThreadFactoryListener(factory
-> myFactory);
+ context.getExecutorServiceManager().addThreadFactoryListener(((source,
factory) -> myFactory));
// create thread
Thread thread = context.getExecutorServiceManager().newThread("Cool",
() -> {
// noop
@@ -592,7 +592,7 @@ public class DefaultExecutorServiceManagerTest extends
ContextTestSupport {
// custom thread factory
ThreadFactory myFactory = r -> new Thread(r, "MyFactory2");
// hook custom factory into Camel via registry
- ExecutorServiceManager.ThreadFactoryListener listener = factory ->
myFactory;
+ ExecutorServiceManager.ThreadFactoryListener listener = (source,
factory) -> myFactory;
c.getRegistry().bind("myListener", listener);
c.start();
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
new file mode 100644
index 00000000000..d2359cca0b0
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MDCRouteIdAwareTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileFilter;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.UnitOfWork;
+import org.junit.jupiter.api.Test;
+import org.slf4j.MDC;
+
+public class MDCRouteIdAwareTest extends ContextTestSupport {
+
+ @Test
+ public void testMDC() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ template.sendBodyAndHeader(fileUri(), "Hello World",
Exchange.FILE_NAME, "hello.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // enable MDC
+ context.setUseMDCLogging(true);
+ context.getRegistry().bind("myFilter", new MyFilter());
+
+ from(fileUri("?filter=#myFilter")).routeId("myRoute")
+ .to("mock:result");
+
+ }
+ };
+ }
+
+ private class MyFilter implements GenericFileFilter {
+
+ @Override
+ public boolean accept(GenericFile file) {
+ String rid = MDC.get(UnitOfWork.MDC_ROUTE_ID);
+ String name = MDC.get(UnitOfWork.MDC_CAMEL_CONTEXT_ID);
+ return "myRoute".equals(rid) && context.getName().equals(name);
+ }
+ }
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
index 714cc383f04..da0fb875900 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_14.adoc
@@ -8,6 +8,9 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
=== camel-core
+The `org.apache.camel.spi.ExecutorServiceManager.ThreadFactoryListener` has
changed the method signature to include the source,
+so the method is changed from `ThreadFactory onNewThreadFactory(ThreadFactory
factory)` to `ThreadFactory onNewThreadFactory(Object source, ThreadFactory
factory)`
+
==== Splitter and Multicast EIPs
When using `shareUnitOfWork=true` in Split or Multicast EIPs, then Camel will
now use a single shared `UnitOfWork` instance (parent)