Repository: camel Updated Branches: refs/heads/master fd4378ede -> 48f76064e
CAMEL-9791: Threads EIP should use the rejection handler if configured. And only use caller runs as fallback. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48f76064 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48f76064 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48f76064 Branch: refs/heads/master Commit: 48f76064ef65ffd828940efc5fc078c638350085 Parents: fd4378e Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Apr 12 11:40:48 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Apr 12 11:53:54 2016 +0200 ---------------------------------------------------------------------- .../camel/management/mbean/ManagedThreads.java | 11 ++- .../apache/camel/model/ThreadsDefinition.java | 30 +++++--- .../camel/processor/ThreadsProcessor.java | 72 +++++++++----------- 3 files changed, 59 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java index 408dbcb..fe78b10 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThreads.java @@ -40,13 +40,18 @@ public class ManagedThreads extends ManagedProcessor implements ManagedThreadsMB @Override public Boolean isCallerRunsWhenRejected() { - return processor.isCallerRunsWhenRejected(); + if (processor.getExecutorService() instanceof ThreadPoolExecutor) { + String name = getRejectedPolicy(); + return "CallerRuns".equals(name); + } else { + return null; + } } @Override public String getRejectedPolicy() { - if (processor.getRejectedPolicy() != null) { - return processor.getRejectedPolicy().name(); + if (processor.getExecutorService() instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor) processor.getExecutorService()).getRejectedExecutionHandler().toString(); } else { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java index 065be28..a287659 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java @@ -85,6 +85,19 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple // prefer any explicit configured executor service boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); + + // resolve what rejected policy to use + ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); + if (policy == null) { + if (callerRunsWhenRejected == null || callerRunsWhenRejected) { + // should use caller runs by default if not configured + policy = ThreadPoolRejectedPolicy.CallerRuns; + } else { + policy = ThreadPoolRejectedPolicy.Abort; + } + } + log.debug("Using ThreadPoolRejectedPolicy: {}", policy); + // if no explicit then create from the options if (threadPool == null) { ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); @@ -94,7 +107,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple .maxPoolSize(getMaxPoolSize()) .keepAliveTime(getKeepAliveTime(), getTimeUnit()) .maxQueueSize(getMaxQueueSize()) - .rejectedPolicy(getRejectedPolicy()) + .rejectedPolicy(policy) .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut()) .build(); threadPool = manager.newThreadPool(this, name, profile); @@ -112,6 +125,9 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple if (getKeepAliveTime() != null) { throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); } + if (getTimeUnit() != null) { + throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together."); + } if (getMaxQueueSize() != null) { throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); } @@ -123,14 +139,7 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple } } - ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool); - if (getCallerRunsWhenRejected() == null) { - // should be true by default - thread.setCallerRunsWhenRejected(true); - } else { - thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); - } - thread.setRejectedPolicy(resolveRejectedPolicy(routeContext)); + ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy); List<Processor> pipe = new ArrayList<Processor>(2); pipe.add(thread); @@ -262,7 +271,8 @@ public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> imple } /** - * Whether or not the caller should run the task when it was rejected by the thread pool. + * Whether or not to use as caller runs as <b>fallback</b> when a task is rejected being added to the thread pool (when its full). + * This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. * <p/> * Is by default <tt>true</tt> * http://git-wip-us.apache.org/repos/asf/camel/blob/48f76064/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index 2f97943..b3c6dc4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -18,6 +18,8 @@ package org.apache.camel.processor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.AsyncCallback; @@ -62,42 +64,39 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, private String id; private final CamelContext camelContext; private final ExecutorService executorService; + private final ThreadPoolRejectedPolicy rejectedPolicy; private volatile boolean shutdownExecutorService; private final AtomicBoolean shutdown = new AtomicBoolean(true); - private boolean callerRunsWhenRejected = true; - private ThreadPoolRejectedPolicy rejectedPolicy; private final class ProcessCall implements Runnable, Rejectable { private final Exchange exchange; private final AsyncCallback callback; + private final boolean done; - ProcessCall(Exchange exchange, AsyncCallback callback) { + ProcessCall(Exchange exchange, AsyncCallback callback, boolean done) { this.exchange = exchange; this.callback = callback; + this.done = done; } @Override public void run() { - LOG.trace("Continue routing exchange {} ", exchange); + LOG.trace("Continue routing exchange {}", exchange); if (shutdown.get()) { exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); } - callback.done(false); + callback.done(done); } @Override public void reject() { - // abort should mark the exchange with an rejected exception - boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy; - if (abort) { - exchange.setException(new RejectedExecutionException()); - } - LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange); - + // reject should mark the exchange with an rejected exception and mark not to route anymore + exchange.setException(new RejectedExecutionException()); + LOG.trace("Rejected routing exchange {}", exchange); if (shutdown.get()) { exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); } - callback.done(false); + callback.done(done); } @Override @@ -106,12 +105,14 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, } } - public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) { + public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService, ThreadPoolRejectedPolicy rejectedPolicy) { ObjectHelper.notNull(camelContext, "camelContext"); ObjectHelper.notNull(executorService, "executorService"); + ObjectHelper.notNull(rejectedPolicy, "rejectedPolicy"); this.camelContext = camelContext; this.executorService = executorService; this.shutdownExecutorService = shutdownExecutorService; + this.rejectedPolicy =rejectedPolicy; } public void process(final Exchange exchange) throws Exception { @@ -131,43 +132,28 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, return true; } - ProcessCall call = new ProcessCall(exchange, callback); try { + // process the call in asynchronous mode + ProcessCall call = new ProcessCall(exchange, callback, false); LOG.trace("Submitting task {}", call); executorService.submit(call); // tell Camel routing engine we continue routing asynchronous return false; - } catch (RejectedExecutionException e) { - boolean callerRuns = isCallerRunsWhenRejected(); - if (!callerRuns) { + } catch (Throwable e) { + if (executorService instanceof ThreadPoolExecutor) { + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService; + // process the call in synchronous mode + ProcessCall call = new ProcessCall(exchange, callback, true); + rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe); + return true; + } else { exchange.setException(e); + callback.done(true); + return true; } - - LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call); - if (shutdown.get()) { - exchange.setException(new RejectedExecutionException()); - } - callback.done(true); - return true; } } - public boolean isCallerRunsWhenRejected() { - return callerRunsWhenRejected; - } - - public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { - this.callerRunsWhenRejected = callerRunsWhenRejected; - } - - public ThreadPoolRejectedPolicy getRejectedPolicy() { - return rejectedPolicy; - } - - public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { - this.rejectedPolicy = rejectedPolicy; - } - public ExecutorService getExecutorService() { return executorService; } @@ -184,6 +170,10 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, this.id = id; } + public ThreadPoolRejectedPolicy getRejectedPolicy() { + return rejectedPolicy; + } + protected void doStart() throws Exception { shutdown.set(false); }