Author: davsclaus Date: Fri Mar 19 11:39:01 2010 New Revision: 925181 URL: http://svn.apache.org/viewvc?rev=925181&view=rev Log: CAMEL-1588: ThreadPools is now managable from JMX.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java - copied, changed from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Fri Mar 19 11:39:01 2010 @@ -54,8 +54,8 @@ public class DefaultConsumerTemplate imp return consumerCache.receive(endpoint); } - public Exchange receive(Endpoint endpoinit) { - return receive(endpoinit.getEndpointUri()); + public Exchange receive(Endpoint endpoint) { + return receive(endpoint.getEndpointUri()); } public Exchange receive(String endpointUri, long timeout) { @@ -137,7 +137,7 @@ public class DefaultConsumerTemplate imp /** * Extracts the body from the given result. * <p/> - * If the exchange pattern is provided it will try to honor it and retrive the body + * If the exchange pattern is provided it will try to honor it and retrieve the body * from either IN or OUT according to the pattern. * * @param result the result Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Fri Mar 19 11:39:01 2010 @@ -21,10 +21,12 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.spi.ExecutorServiceStrategy; +import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.util.concurrent.ExecutorServiceHelper; import org.apache.commons.logging.Log; @@ -91,8 +93,7 @@ public class DefaultExecutorServiceStrat public ExecutorService newCachedThreadPool(Object source, String name) { ExecutorService answer = ExecutorServiceHelper.newCachedThreadPool(threadNamePattern, name, true); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new cached thread pool for source: " + source + " with name: " + name + ". -> " + answer); @@ -102,8 +103,7 @@ public class DefaultExecutorServiceStrat public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { ScheduledExecutorService answer = ExecutorServiceHelper.newScheduledThreadPool(poolSize, threadNamePattern, name, true); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new scheduled thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer); @@ -113,8 +113,7 @@ public class DefaultExecutorServiceStrat public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { ExecutorService answer = ExecutorServiceHelper.newFixedThreadPool(poolSize, threadNamePattern, name, true); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new fixed thread pool for source: " + source + " with name: " + name + ". [poolSize=" + poolSize + "]. -> " + answer); @@ -124,8 +123,7 @@ public class DefaultExecutorServiceStrat public ExecutorService newSingleThreadExecutor(Object source, String name) { ExecutorService answer = ExecutorServiceHelper.newSingleThreadExecutor(threadNamePattern, name, true); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new single thread pool for source: " + source + " with name: " + name + ". -> " + answer); @@ -135,8 +133,7 @@ public class DefaultExecutorServiceStrat public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) { ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize @@ -150,8 +147,7 @@ public class DefaultExecutorServiceStrat boolean daemon) { ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, maxQueueSize, rejectedExecutionHandler, daemon); - executorServices.add(answer); - onNewExecutorService(answer); + onThreadPoolCreated(answer); if (LOG.isDebugEnabled()) { LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize @@ -192,6 +188,22 @@ public class DefaultExecutorServiceStrat return answer; } + private void onThreadPoolCreated(ExecutorService executorService) { + // add to internal list of thread pools + executorServices.add(executorService); + + // let lifecycle strategy be notified as well which can let it be managed in JMX as well + if (executorService instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService; + for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { + lifecycle.onThreadPoolAdd(camelContext, threadPool); + } + } + + // now call strategy to allow custom logic + onNewExecutorService(executorService); + } + /** * Strategy callback when a new {...@link java.util.concurrent.ExecutorService} have been created. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java Fri Mar 19 11:39:01 2010 @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; import javax.management.JMException; import org.apache.camel.CamelContext; @@ -50,6 +51,7 @@ import org.apache.camel.management.mbean import org.apache.camel.management.mbean.ManagedScheduledPollConsumer; import org.apache.camel.management.mbean.ManagedSendProcessor; import org.apache.camel.management.mbean.ManagedService; +import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedThrottler; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; @@ -434,7 +436,31 @@ public class DefaultManagementLifecycleS try { getManagementStrategy().manageObject(me); } catch (Exception e) { - LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandlerMBean.", e); + LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); + } + } + + public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) { + // the agent hasn't been started + if (!initialized) { + return; + } + + ManagedThreadPool mtp = new ManagedThreadPool(camelContext, threadPool); + mtp.init(getManagementStrategy()); + + // skip already managed services, for example if a route has been restarted + if (getManagementStrategy().isManaged(mtp, null)) { + if (LOG.isTraceEnabled()) { + LOG.trace("The thread pool is already managed: " + threadPool); + } + return; + } + + try { + getManagementStrategy().manageObject(mtp); + } catch (Exception e) { + LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java Fri Mar 19 11:39:01 2010 @@ -18,6 +18,7 @@ package org.apache.camel.management; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.ThreadPoolExecutor; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -54,6 +55,7 @@ public class DefaultManagementNamingStra public static final String TYPE_COMPONENT = "components"; public static final String TYPE_TRACER = "tracer"; public static final String TYPE_ERRORHANDLER = "errorhandlers"; + public static final String TYPE_THREAD_POOL = "threadpools"; public static final String TYPE_SERVICE = "services"; protected String domainName; @@ -228,6 +230,17 @@ public class DefaultManagementNamingStra return createObjectName(buffer); } + public ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException { + StringBuffer buffer = new StringBuffer(); + buffer.append(domainName).append(":"); + buffer.append(KEY_CONTEXT + "=").append(getContextId(context)).append(","); + buffer.append(KEY_TYPE + "=" + TYPE_THREAD_POOL + ","); + buffer.append(KEY_NAME + "=") + .append(threadPool.getClass().getSimpleName()) + .append("(").append(ObjectHelper.getIdentityHashCode(threadPool)).append(")"); + return createObjectName(buffer); + } + public String getDomainName() { return domainName; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java Fri Mar 19 11:39:01 2010 @@ -29,6 +29,7 @@ import org.apache.camel.management.mbean import org.apache.camel.management.mbean.ManagedProducer; import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedService; +import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedTracer; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.spi.ManagementAgent; @@ -110,6 +111,9 @@ public class ManagedManagementStrategy e } else if (managedObject instanceof ManagedTracer) { ManagedTracer mt = (ManagedTracer) managedObject; objectName = getManagementNamingStrategy().getObjectNameForTracer(mt.getCamelContext(), mt.getTracer()); + } else if (managedObject instanceof ManagedThreadPool) { + ManagedThreadPool mes = (ManagedThreadPool) managedObject; + objectName = getManagementNamingStrategy().getObjectNameForThreadPool(mes.getCamelContext(), mes.getThreadPool()); } else if (managedObject instanceof ManagedService) { // check for managed service should be last ManagedService ms = (ManagedService) managedObject; Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java?rev=925181&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java Fri Mar 19 11:39:01 2010 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ManagementStrategy; +import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.jmx.export.annotation.ManagedResource; + +/** + * @version $Revision$ + */ +...@managedresource(description = "Managed ThreadPool") +public class ManagedThreadPool { + + private final CamelContext camelContext; + private final ThreadPoolExecutor threadPool; + + public ManagedThreadPool(CamelContext camelContext, ThreadPoolExecutor threadPool) { + this.camelContext = camelContext; + this.threadPool = threadPool; + } + + public void init(ManagementStrategy strategy) { + // do nothing + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public ThreadPoolExecutor getThreadPool() { + return threadPool; + } + + @ManagedAttribute(description = "Core pool size") + public int getCorePoolSize() { + return threadPool.getCorePoolSize(); + } + + @ManagedAttribute(description = "Core pool size") + public void setCorePoolSize(int corePoolSize) { + threadPool.setCorePoolSize(corePoolSize); + } + + @ManagedAttribute(description = "Pool size") + public int getPoolSize() { + return threadPool.getPoolSize(); + } + + @ManagedAttribute(description = "Maximum pool size") + public int getMaximumPoolSize() { + return threadPool.getMaximumPoolSize(); + } + + @ManagedAttribute(description = "Maximum pool size") + public void setMaximumPoolSize(int maximumPoolSize) { + threadPool.setMaximumPoolSize(maximumPoolSize); + } + + @ManagedAttribute(description = "Largest pool size") + public int getLargestPoolSize() { + return threadPool.getLargestPoolSize(); + } + + @ManagedAttribute(description = "Active count") + public int getActiveCount() { + return threadPool.getActiveCount(); + } + + @ManagedAttribute(description = "Task count") + public long getTaskCount() { + return threadPool.getTaskCount(); + } + + @ManagedAttribute(description = "Completed task count") + public long getCompletedTaskCount() { + return threadPool.getCompletedTaskCount(); + } + + @ManagedAttribute(description = "Keep alive time in seconds") + public long getKeepAliveTime() { + return threadPool.getKeepAliveTime(TimeUnit.SECONDS); + } + + @ManagedAttribute(description = "Keep alive time in seconds") + public void setKeepAliveTime(int keepAliveTimeInSeconds) { + threadPool.setKeepAliveTime(keepAliveTimeInSeconds, TimeUnit.SECONDS); + } + + @ManagedAttribute(description = "Is shutdown") + public boolean isShutdown() { + return threadPool.isShutdown(); + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreadPool.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedTracer.java Fri Mar 19 11:39:01 2010 @@ -30,8 +30,8 @@ import org.springframework.jmx.export.an @ManagedResource(description = "Managed Tracer") public class ManagedTracer { - private CamelContext camelContext; - private Tracer tracer; + private final CamelContext camelContext; + private final Tracer tracer; public ManagedTracer(CamelContext camelContext, Tracer tracer) { this.camelContext = camelContext; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/LifecycleStrategy.java Fri Mar 19 11:39:01 2010 @@ -17,6 +17,7 @@ package org.apache.camel.spi; import java.util.Collection; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; import org.apache.camel.Component; @@ -123,4 +124,12 @@ public interface LifecycleStrategy { */ void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerBuilder errorHandlerBuilder); + /** + * Notification on adding a thread pool. + * + * @param camelContext the camel context + * @param threadPool the thread pool + */ + void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool); + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java Fri Mar 19 11:39:01 2010 @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import java.util.concurrent.ThreadPoolExecutor; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -56,4 +57,7 @@ public interface ManagementNamingStrateg ObjectName getObjectNameForTracer(CamelContext context, InterceptStrategy tracer) throws MalformedObjectNameException; ObjectName getObjectNameForService(CamelContext context, Service service) throws MalformedObjectNameException; + + ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool) throws MalformedObjectNameException; + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 19 11:39:01 2010 @@ -22,6 +22,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -191,9 +192,14 @@ public final class ExecutorServiceHelper } BlockingQueue<Runnable> queue; - if (maxQueueSize <= 0) { + if (corePoolSize == 0 && maxQueueSize <= 0) { + // use a synchronous so we can act like the cached thread pool + queue = new SynchronousQueue<Runnable>(); + } else if (maxQueueSize <= 0) { + // unbounded task queue queue = new LinkedBlockingQueue<Runnable>(); } else { + // bounded task queue queue = new LinkedBlockingQueue<Runnable>(maxQueueSize); } ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, queue); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010 @@ -19,6 +19,7 @@ package org.apache.camel.impl; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; import org.apache.camel.Component; @@ -85,6 +86,10 @@ public class DummyLifecycleStrategy impl events.add("onErrorHandlerAdd"); } + public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) { + events.add("onThreadPoolAdd"); + } + public List<String> getEvents() { return events; } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java (from r925107, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java&r1=925107&r2=925181&rev=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedThreadPoolTest.java Fri Mar 19 11:39:01 2010 @@ -17,19 +17,17 @@ package org.apache.camel.management; import java.util.Set; -import javax.management.Attribute; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; -import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; /** * @version $Revision$ */ -public class ManagedTracerOptionsTest extends ContextTestSupport { +public class ManagedThreadPoolTest extends ContextTestSupport { @Override protected CamelContext createCamelContext() throws Exception { @@ -40,127 +38,40 @@ public class ManagedTracerOptionsTest ex return context; } - public void testManagedErrorHandlerOptions() throws Exception { + public void testManagedThreadPool() throws Exception { MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer(); - Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null); + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null); assertEquals(1, set.size()); ObjectName on = set.iterator().next(); - mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE)); - Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); - assertEquals(true, enabled.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("DestinationUri", null)); - String duri = (String) mbeanServer.getAttribute(on, "DestinationUri"); - assertEquals(null, duri); - - mbeanServer.setAttribute(on, new Attribute("DestinationUri", "mock://traced")); - duri = (String) mbeanServer.getAttribute(on, "DestinationUri"); - assertEquals("mock://traced", duri); - - Boolean useJpa = (Boolean) mbeanServer.getAttribute(on, "UseJpa"); - assertEquals(false, useJpa.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("LogName", "foo")); - String ln = (String) mbeanServer.getAttribute(on, "LogName"); - assertEquals("foo", ln); - - mbeanServer.setAttribute(on, new Attribute("LogLevel", "WARN")); - String ll = (String) mbeanServer.getAttribute(on, "LogLevel"); - assertEquals(LoggingLevel.WARN.name(), ll); - - mbeanServer.setAttribute(on, new Attribute("LogStackTrace", Boolean.TRUE)); - Boolean lst = (Boolean) mbeanServer.getAttribute(on, "LogStackTrace"); - assertEquals(true, lst.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("TraceInterceptors", Boolean.TRUE)); - Boolean ti = (Boolean) mbeanServer.getAttribute(on, "TraceInterceptors"); - assertEquals(true, ti.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("TraceExceptions", Boolean.TRUE)); - Boolean te = (Boolean) mbeanServer.getAttribute(on, "TraceExceptions"); - assertEquals(true, te.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("TraceOutExchanges", Boolean.TRUE)); - Boolean toe = (Boolean) mbeanServer.getAttribute(on, "TraceOutExchanges"); - assertEquals(true, toe.booleanValue()); + Boolean shutdown = (Boolean) mbeanServer.getAttribute(on, "Shutdown"); + assertEquals(false, shutdown.booleanValue()); - doAssertFormatter(mbeanServer, on); + Integer corePoolSize = (Integer) mbeanServer.getAttribute(on, "CorePoolSize"); + assertEquals(15, corePoolSize.intValue()); + + Integer maxPoolSize = (Integer) mbeanServer.getAttribute(on, "MaximumPoolSize"); + assertEquals(30, maxPoolSize.intValue()); + + Integer poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize"); + assertEquals(0, poolSize.intValue()); + + Long keepAlive = (Long) mbeanServer.getAttribute(on, "KeepAliveTime"); + assertEquals(60, keepAlive.intValue()); getMockEndpoint("mock:result").expectedMessageCount(1); template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); - } - private void doAssertFormatter(MBeanServer mbeanServer, ObjectName on) throws Exception { - mbeanServer.setAttribute(on, new Attribute("FormatterShowBody", Boolean.TRUE)); - Boolean fsb = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBody"); - assertEquals(true, fsb.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowBodyType", Boolean.TRUE)); - Boolean fsbt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBodyType"); - assertEquals(true, fsbt.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBody", Boolean.TRUE)); - Boolean fsob = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBody"); - assertEquals(true, fsob.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowOutBodyType", Boolean.TRUE)); - Boolean fsobt = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutBodyType"); - assertEquals(true, fsobt.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowBreadCrumb", Boolean.TRUE)); - Boolean fsbc = (Boolean) mbeanServer.getAttribute(on, "FormatterShowBreadCrumb"); - assertEquals(true, fsbc.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangeId", Boolean.TRUE)); - Boolean fsei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangeId"); - assertEquals(true, fsei.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowShortExchangeId", Boolean.TRUE)); - Boolean fssei = (Boolean) mbeanServer.getAttribute(on, "FormatterShowShortExchangeId"); - assertEquals(true, fssei.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowHeaders", Boolean.TRUE)); - Boolean fsh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowHeaders"); - assertEquals(true, fsh.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowOutHeaders", Boolean.TRUE)); - Boolean fsoh = (Boolean) mbeanServer.getAttribute(on, "FormatterShowOutHeaders"); - assertEquals(true, fsoh.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowProperties", Boolean.TRUE)); - Boolean fsp = (Boolean) mbeanServer.getAttribute(on, "FormatterShowProperties"); - assertEquals(true, fsp.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowNode", Boolean.TRUE)); - Boolean fsn = (Boolean) mbeanServer.getAttribute(on, "FormatterShowNode"); - assertEquals(true, fsn.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowRouteId", Boolean.FALSE)); - Boolean fsr = (Boolean) mbeanServer.getAttribute(on, "FormatterShowRouteId"); - assertEquals(false, fsr.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowExchangePattern", Boolean.TRUE)); - Boolean fsep = (Boolean) mbeanServer.getAttribute(on, "FormatterShowExchangePattern"); - assertEquals(true, fsep.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterShowException", Boolean.TRUE)); - Boolean fsex = (Boolean) mbeanServer.getAttribute(on, "FormatterShowException"); - assertEquals(true, fsex.booleanValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterBreadCrumbLength", 100)); - Integer fbcl = (Integer) mbeanServer.getAttribute(on, "FormatterBreadCrumbLength"); - assertEquals(100, fbcl.intValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterNodeLength", 50)); - Integer fnl = (Integer) mbeanServer.getAttribute(on, "FormatterNodeLength"); - assertEquals(50, fnl.intValue()); - - mbeanServer.setAttribute(on, new Attribute("FormatterMaxChars", 250)); - Integer fmc = (Integer) mbeanServer.getAttribute(on, "FormatterMaxChars"); - assertEquals(250, fmc.intValue()); + poolSize = (Integer) mbeanServer.getAttribute(on, "PoolSize"); + assertEquals(1, poolSize.intValue()); + + Integer largest = (Integer) mbeanServer.getAttribute(on, "LargestPoolSize"); + assertEquals(1, largest.intValue()); + + Long completed = (Long) mbeanServer.getAttribute(on, "CompletedTaskCount"); + assertEquals(1, completed.intValue()); } @Override @@ -168,7 +79,7 @@ public class ManagedTracerOptionsTest ex return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:result"); + from("direct:start").threads(15, 30).to("mock:result"); } }; } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedTracerOptionsTest.java Fri Mar 19 11:39:01 2010 @@ -40,7 +40,7 @@ public class ManagedTracerOptionsTest ex return context; } - public void testManagedErrorHandlerOptions() throws Exception { + public void testManagedTracerOptions() throws Exception { MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer(); Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=tracer,*"), null); Modified: camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java (original) +++ camel/trunk/components/camel-osgi/src/main/java/org/apache/camel/osgi/OsgiServiceRegistry.java Fri Mar 19 11:39:01 2010 @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; import org.apache.camel.Component; @@ -129,4 +130,9 @@ public class OsgiServiceRegistry impleme public void onErrorHandlerAdd(RouteContext routeContext, Processor processor, ErrorHandlerBuilder errorHandlerBuilder) { // Do nothing here } + + public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) { + // Do nothing here + } + } Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java?rev=925181&r1=925180&r2=925181&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/DummyLifecycleStrategy.java Fri Mar 19 11:39:01 2010 @@ -17,6 +17,7 @@ package org.apache.camel.spring; import java.util.Collection; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.camel.CamelContext; import org.apache.camel.Component; @@ -70,4 +71,7 @@ public class DummyLifecycleStrategy impl public void onRoutesAdd(Collection<Route> routes) { } + + public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool) { + } }