This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch CAMEL-23225 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 10bdb1e8e33623295865bfa159e168c58ab4e2df Author: Guillaume Nodet <[email protected]> AuthorDate: Mon Mar 23 11:32:12 2026 +0100 CAMEL-23225: Propagate OpenTelemetry context across thread boundaries in camel-opentelemetry2 Port the thread pool instrumentation from the deprecated camel-opentelemetry module to camel-opentelemetry2. This adds OpenTelemetryInstrumentedThreadPoolFactory and OpenTelemetryInstrumentedThreadFactoryListener which wrap Camel-managed thread pools and thread factories with OpenTelemetry Context propagation, preventing context leaks when work crosses thread boundaries. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../org/apache/camel/thread-factory-listener | 2 + .../services/org/apache/camel/thread-pool-factory | 2 + ...TelemetryInstrumentedThreadFactoryListener.java | 32 ++++++++++ ...OpenTelemetryInstrumentedThreadPoolFactory.java | 69 ++++++++++++++++++++++ 4 files changed, 105 insertions(+) diff --git a/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener b/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener new file mode 100644 index 000000000000..8cb98164902d --- /dev/null +++ b/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-factory-listener @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.opentelemetry2.OpenTelemetryInstrumentedThreadFactoryListener diff --git a/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory b/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory new file mode 100644 index 000000000000..31eb3a73f0bc --- /dev/null +++ b/components/camel-opentelemetry2/src/generated/resources/META-INF/services/org/apache/camel/thread-pool-factory @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.opentelemetry2.OpenTelemetryInstrumentedThreadPoolFactory diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadFactoryListener.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadFactoryListener.java new file mode 100644 index 000000000000..f2eaae4bcd06 --- /dev/null +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadFactoryListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry2; + +import java.util.concurrent.ThreadFactory; + +import io.opentelemetry.context.Context; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.annotations.JdkService; + +@JdkService(ExecutorServiceManager.ThreadFactoryListener.FACTORY) +public class OpenTelemetryInstrumentedThreadFactoryListener implements ExecutorServiceManager.ThreadFactoryListener { + + @Override + public ThreadFactory onNewThreadFactory(Object source, ThreadFactory factory) { + return runnable -> factory.newThread(Context.current().wrap(runnable)); + } +} diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadPoolFactory.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadPoolFactory.java new file mode 100644 index 000000000000..9c70dbaf1b19 --- /dev/null +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryInstrumentedThreadPoolFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry2; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.context.Context; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.DefaultThreadPoolFactory; + +@JdkService(ThreadPoolFactory.FACTORY) +public class OpenTelemetryInstrumentedThreadPoolFactory extends DefaultThreadPoolFactory { + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Context.taskWrapping(super.newCachedThreadPool(threadFactory)); + } + + @Override + public ExecutorService newThreadPool( + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit timeUnit, + int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, + ThreadFactory threadFactory) + throws IllegalArgumentException { + + ExecutorService executorService = super.newThreadPool( + corePoolSize, + maxPoolSize, + keepAliveTime, + timeUnit, + maxQueueSize, + allowCoreThreadTimeOut, + rejectedExecutionHandler, + threadFactory); + + return Context.taskWrapping(executorService); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return Context.taskWrapping(super.newScheduledThreadPool(profile, threadFactory)); + } + +}
