This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch jmx-virtual-thread-executor in repository https://gitbox.apache.org/repos/asf/camel.git
commit e6842c0587a15eab6813cc937f1e1c8a903631d0 Author: Guillaume Nodet <[email protected]> AuthorDate: Wed Mar 4 13:23:45 2026 +0100 CAMEL-20199: Add JMX support for virtual thread executors Virtual thread executors (ThreadPerTaskExecutor) were silently skipped from JMX registration because LifecycleStrategy.onThreadPoolAdd only accepted ThreadPoolExecutor. This adds overloaded methods for ExecutorService to register non-ThreadPoolExecutor instances in JMX. New ManagedVirtualThreadExecutorMBean exposes basic attributes: id, sourceId, routeId, isVirtualThread, and isShutdown. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../org/apache/camel/spi/LifecycleStrategy.java | 27 ++++++ .../camel/spi/ManagementObjectNameStrategy.java | 7 ++ .../apache/camel/spi/ManagementObjectStrategy.java | 7 ++ .../impl/engine/BaseExecutorServiceManager.java | 10 +++ .../mbean/ManagedVirtualThreadExecutorMBean.java | 46 ++++++++++ .../DefaultManagementObjectNameStrategy.java | 17 ++++ .../DefaultManagementObjectStrategy.java | 12 +++ .../management/JmxManagementLifecycleStrategy.java | 59 ++++++++++++- .../mbean/ManagedVirtualThreadExecutor.java | 98 ++++++++++++++++++++++ .../ManagedVirtualThreadExecutorTest.java | 88 +++++++++++++++++++ 10 files changed, 370 insertions(+), 1 deletion(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/LifecycleStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/LifecycleStrategy.java index 607ac8526805..c035c773be14 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/LifecycleStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/LifecycleStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.spi; import java.util.Collection; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; @@ -209,4 +210,30 @@ public interface LifecycleStrategy { */ void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool); + /** + * Notification on adding an executor service (such as a virtual thread executor) that is not a + * {@link ThreadPoolExecutor}. + * + * @param camelContext the camel context + * @param executorService the executor service + * @param id id of the thread pool (can be null in special cases) + * @param sourceId id of the source creating the thread pool (can be null in special cases) + * @param routeId id of the route for the source (is null if no source) + * @param threadPoolProfileId id of the thread pool profile, if used for creating this thread pool (can be null) + */ + default void onThreadPoolAdd( + CamelContext camelContext, ExecutorService executorService, String id, + String sourceId, String routeId, String threadPoolProfileId) { + } + + /** + * Notification on removing an executor service (such as a virtual thread executor) that is not a + * {@link ThreadPoolExecutor}. + * + * @param camelContext the camel context + * @param executorService the executor service + */ + default void onThreadPoolRemove(CamelContext camelContext, ExecutorService executorService) { + } + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java index 5c3ca2ba3b51..64bd66b15e95 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import javax.management.MalformedObjectNameException; @@ -78,6 +79,12 @@ public interface ManagementObjectNameStrategy { ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool, String id, String sourceId) throws MalformedObjectNameException; + default ObjectName getObjectNameForThreadPool( + CamelContext context, ExecutorService executorService, String id, String sourceId) + throws MalformedObjectNameException { + return null; + } + ObjectName getObjectNameForEventNotifier(CamelContext context, EventNotifier eventNotifier) throws MalformedObjectNameException; } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java index 972ba4c9514a..1a44a9887c0f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; @@ -67,5 +68,11 @@ public interface ManagementObjectStrategy { CamelContext context, ThreadPoolExecutor threadPool, String id, String sourceId, String routeId, String threadPoolProfileId); + default Object getManagedObjectForThreadPool( + CamelContext context, ExecutorService executorService, + String id, String sourceId, String routeId, String threadPoolProfileId) { + return null; + } + Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java index 92a19b4e36ac..58a14151de1b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java @@ -372,6 +372,11 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { lifecycle.onThreadPoolRemove(camelContext, threadPool); } + } else { + // for non-ThreadPoolExecutor instances (e.g., virtual thread executors) + for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { + lifecycle.onThreadPoolRemove(camelContext, executorService); + } } // remove reference as its shutdown (do not remove if fail-safe) @@ -592,6 +597,11 @@ public class BaseExecutorServiceManager extends ServiceSupport implements Execut for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); } + } else { + // for non-ThreadPoolExecutor instances (e.g., virtual thread executors) + for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { + lifecycle.onThreadPoolAdd(camelContext, executorService, id, sourceId, routeId, threadPoolProfileId); + } } // now call strategy to allow custom logic diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedVirtualThreadExecutorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedVirtualThreadExecutorMBean.java new file mode 100644 index 000000000000..90ea5d2fa6b6 --- /dev/null +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedVirtualThreadExecutorMBean.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedVirtualThreadExecutorMBean { + + @ManagedAttribute(description = "Camel ID") + String getCamelId(); + + @ManagedAttribute(description = "Camel ManagementName") + String getCamelManagementName(); + + @ManagedAttribute(description = "Thread Pool ID") + String getId(); + + @ManagedAttribute(description = "ID of source for creating Thread Pool") + String getSourceId(); + + @ManagedAttribute(description = "Route ID for the source, which created the Thread Pool") + String getRouteId(); + + @ManagedAttribute(description = "ID of the thread pool profile which this pool is based upon") + String getThreadPoolProfileId(); + + @ManagedAttribute(description = "Whether this executor uses virtual threads") + boolean isVirtualThread(); + + @ManagedAttribute(description = "Is shutdown") + boolean isShutdown(); +} diff --git a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java index 270d0ffd7f4e..271df0058baa 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.management; import java.net.UnknownHostException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import javax.management.MalformedObjectNameException; @@ -53,6 +54,7 @@ import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStep; import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedThreadPool; +import org.apache.camel.management.mbean.ManagedVirtualThreadExecutor; import org.apache.camel.management.mbean.ManagedTracer; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.EventNotifier; @@ -162,6 +164,9 @@ public class DefaultManagementObjectNameStrategy implements ManagementObjectName objectName = getObjectNameForTracer(mt.getContext(), mt.getTracer()); } else if (managedObject instanceof ManagedThreadPool mes) { objectName = getObjectNameForThreadPool(mes.getContext(), mes.getThreadPool(), mes.getId(), mes.getSourceId()); + } else if (managedObject instanceof ManagedVirtualThreadExecutor mvte) { + objectName = getObjectNameForThreadPool( + mvte.getContext(), mvte.getExecutorService(), mvte.getId(), mvte.getSourceId()); } else if (managedObject instanceof ManagedClusterService mcs) { objectName = getObjectNameForClusterService(mcs.getContext(), mcs.getService()); } else if (managedObject instanceof ManagedService ms) { @@ -422,6 +427,18 @@ public class DefaultManagementObjectNameStrategy implements ManagementObjectName public ObjectName getObjectNameForThreadPool( CamelContext context, ThreadPoolExecutor threadPool, String id, String sourceId) throws MalformedObjectNameException { + return getObjectNameForThreadPool(context, id, sourceId); + } + + @Override + public ObjectName getObjectNameForThreadPool( + CamelContext context, ExecutorService executorService, String id, String sourceId) + throws MalformedObjectNameException { + return getObjectNameForThreadPool(context, id, sourceId); + } + + private ObjectName getObjectNameForThreadPool(CamelContext context, String id, String sourceId) + throws MalformedObjectNameException { StringBuilder buffer = new StringBuilder(); buffer.append(domainName).append(":"); buffer.append(KEY_CONTEXT).append("=").append(getContextId(context)).append(","); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index 6db5f7972358..8553917fbc0c 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.camel.management; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; @@ -103,6 +104,7 @@ import org.apache.camel.management.mbean.ManagedStop; import org.apache.camel.management.mbean.ManagedSupervisingRouteController; import org.apache.camel.management.mbean.ManagedSuspendableRoute; import org.apache.camel.management.mbean.ManagedThreadPool; +import org.apache.camel.management.mbean.ManagedVirtualThreadExecutor; import org.apache.camel.management.mbean.ManagedThreads; import org.apache.camel.management.mbean.ManagedThrottler; import org.apache.camel.management.mbean.ManagedThroughputLogger; @@ -284,6 +286,16 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy return mtp; } + @Override + public Object getManagedObjectForThreadPool( + CamelContext context, ExecutorService executorService, + String id, String sourceId, String routeId, String threadPoolProfileId) { + ManagedVirtualThreadExecutor mvte + = new ManagedVirtualThreadExecutor(context, executorService, id, sourceId, routeId, threadPoolProfileId); + mvte.init(context.getManagementStrategy()); + return mvte; + } + @Override public Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier) { ManagedEventNotifier men = new ManagedEventNotifier(context, eventNotifier); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index f654deaa3eaf..b3629136e178 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import javax.management.JMException; @@ -149,7 +150,7 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li private final Set<String> knowRouteIds = new HashSet<>(); private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>(); private final Map<DefaultBacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>(); - private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>(); + private final Map<Object, Object> managedThreadPools = new HashMap<>(); public JmxManagementLifecycleStrategy() { } @@ -815,6 +816,62 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li } } + @Override + public void onThreadPoolAdd( + CamelContext camelContext, ExecutorService executorService, String id, + String sourceId, String routeId, String threadPoolProfileId) { + + if (!initialized) { + preServices + .add(lf -> lf.onThreadPoolAdd(camelContext, executorService, id, sourceId, routeId, + threadPoolProfileId)); + return; + } + + if (!shouldRegister(executorService, null)) { + return; + } + + Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, executorService, id, sourceId, + routeId, threadPoolProfileId); + if (mtp == null) { + return; + } + + if (getManagementStrategy().isManaged(mtp)) { + LOG.trace("The executor service is already managed: {}", executorService); + return; + } + + try { + manageObject(mtp); + managedThreadPools.put(executorService, mtp); + } catch (Exception e) { + LOG.warn("Could not register executor service: {} as ThreadPool MBean.", executorService, e); + } + } + + @Override + public void onThreadPoolRemove(CamelContext camelContext, ExecutorService executorService) { + if (!initialized) { + return; + } + + Object mtp = managedThreadPools.remove(executorService); + if (mtp != null) { + if (!getManagementStrategy().isManaged(mtp)) { + LOG.trace("The executor service is not managed: {}", executorService); + return; + } + + try { + unmanageObject(mtp); + } catch (Exception e) { + LOG.warn("Could not unregister ThreadPool MBean", e); + } + } + } + @Override public void onRouteContextCreate(Route route) { // Create a map (ProcessorType -> PerformanceCounter) diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedVirtualThreadExecutor.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedVirtualThreadExecutor.java new file mode 100644 index 000000000000..b403061726dc --- /dev/null +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedVirtualThreadExecutor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.concurrent.ExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedVirtualThreadExecutorMBean; +import org.apache.camel.spi.ManagementStrategy; + +@ManagedResource(description = "Managed VirtualThread Executor") +public class ManagedVirtualThreadExecutor implements ManagedVirtualThreadExecutorMBean { + + private final CamelContext camelContext; + private final ExecutorService executorService; + private final String id; + private final String sourceId; + private final String routeId; + private final String threadPoolProfileId; + + public ManagedVirtualThreadExecutor(CamelContext camelContext, ExecutorService executorService, String id, + String sourceId, String routeId, String threadPoolProfileId) { + this.camelContext = camelContext; + this.executorService = executorService; + this.id = id; + this.sourceId = sourceId; + this.routeId = routeId; + this.threadPoolProfileId = threadPoolProfileId; + } + + public void init(ManagementStrategy strategy) { + // do nothing + } + + public CamelContext getContext() { + return camelContext; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + @Override + public String getCamelId() { + return camelContext.getName(); + } + + @Override + public String getCamelManagementName() { + return camelContext.getManagementName(); + } + + @Override + public String getId() { + return id; + } + + @Override + public String getSourceId() { + return sourceId; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public String getThreadPoolProfileId() { + return threadPoolProfileId; + } + + @Override + public boolean isVirtualThread() { + return true; + } + + @Override + public boolean isShutdown() { + return executorService.isShutdown(); + } + +} diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java new file mode 100644 index 000000000000..bc823a9f2adb --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedVirtualThreadExecutorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.LifecycleStrategy; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; + +import static org.apache.camel.management.DefaultManagementObjectNameStrategy.TYPE_THREAD_POOL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@EnabledForJreRange(min = JRE.JAVA_21) +public class ManagedVirtualThreadExecutorTest extends ManagementTestSupport { + + private ExecutorService vte; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext ctx = super.createCamelContext(); + // register the virtual thread executor during context creation (before it starts) + // so that shouldRegister returns true + vte = Executors.newVirtualThreadPerTaskExecutor(); + for (LifecycleStrategy lifecycle : ctx.getLifecycleStrategies()) { + lifecycle.onThreadPoolAdd(ctx, vte, "myVirtualPool", "test", null, null); + } + return ctx; + } + + @Test + public void testManagedVirtualThreadExecutor() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + + ObjectName on = getCamelObjectName(TYPE_THREAD_POOL, "myVirtualPool(test)"); + + Boolean shutdown = (Boolean) mbeanServer.getAttribute(on, "Shutdown"); + assertFalse(shutdown.booleanValue()); + + Boolean virtualThread = (Boolean) mbeanServer.getAttribute(on, "VirtualThread"); + assertTrue(virtualThread.booleanValue()); + + String id = (String) mbeanServer.getAttribute(on, "Id"); + assertEquals("myVirtualPool", id); + + String sourceId = (String) mbeanServer.getAttribute(on, "SourceId"); + assertEquals("test", sourceId); + + // cleanup + for (LifecycleStrategy lifecycle : context.getLifecycleStrategies()) { + lifecycle.onThreadPoolRemove(context, vte); + } + vte.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("mock:result"); + } + }; + } +}
