This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 340407b CAMEL-14591: Fixed thread pool for recipinent list EIP should
only be created when really needed (timeout enabled) and that the pool is also
shutdown.
340407b is described below
commit 340407b2556bf3e58178b6d1c749e393a12b6898
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Feb 19 12:01:49 2020 +0100
CAMEL-14591: Fixed thread pool for recipinent list EIP should only be
created when really needed (timeout enabled) and that the pool is also shutdown.
---
.../apache/camel/processor/MulticastProcessor.java | 16 +++++++++++++--
.../org/apache/camel/processor/RecipientList.java | 23 +++++++++++-----------
.../java/org/apache/camel/processor/Splitter.java | 1 -
.../camel/processor/RecipientListNoCacheTest.java | 5 +++++
.../RecipientListParallelTimeoutTest.java | 22 ++++++++++++++++++++-
5 files changed, 51 insertions(+), 16 deletions(-)
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index dd2ae22..ccd9d01 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -161,6 +161,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
private final ExecutorService executorService;
private final boolean shutdownExecutorService;
private ExecutorService aggregateExecutorService;
+ private boolean shutdownAggregateExecutorService;
private final long timeout;
private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers
= new ConcurrentHashMap<>();
private final boolean shareUnitOfWork;
@@ -802,12 +803,13 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
if (isParallelProcessing() && executorService == null) {
throw new IllegalArgumentException("ParallelProcessing is enabled
but ExecutorService has not been set");
}
- if (aggregateExecutorService == null) {
+ if (timeout > 0 && aggregateExecutorService == null) {
// use unbounded thread pool so we ensure the aggregate on-the-fly
task always will have assigned a thread
// and run the tasks when the task is submitted. If not then the
aggregate task may not be able to run
// and signal completion during processing, which would lead to
what would appear as a dead-lock or a slow processing
String name = getClass().getSimpleName() + "-AggregateTask";
aggregateExecutorService = createAggregateExecutorService(name);
+ shutdownAggregateExecutorService = true;
}
if (aggregationStrategy instanceof CamelContextAware) {
((CamelContextAware)
aggregationStrategy).setCamelContext(camelContext);
@@ -842,7 +844,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
if (shutdownExecutorService && executorService != null) {
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
}
- if (aggregateExecutorService != null) {
+ if (shutdownAggregateExecutorService && aggregateExecutorService !=
null) {
getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
}
}
@@ -968,6 +970,16 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
return shareUnitOfWork;
}
+ public ExecutorService getAggregateExecutorService() {
+ return aggregateExecutorService;
+ }
+
+ public void setAggregateExecutorService(ExecutorService
aggregateExecutorService) {
+ this.aggregateExecutorService = aggregateExecutorService;
+ // we use a custom executor so do not shutdown
+ this.shutdownAggregateExecutorService = false;
+ }
+
@Override
public List<Processor> next() {
if (!hasNext()) {
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index e913ffd..4903429 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -72,7 +72,7 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
private boolean shareUnitOfWork;
private ExecutorService executorService;
private boolean shutdownExecutorService;
- private ExecutorService aggregateExecutorService;
+ private volatile ExecutorService aggregateExecutorService;
private AggregationStrategy aggregationStrategy = new
UseLatestAggregationStrategy();
public RecipientList(CamelContext camelContext) {
@@ -190,21 +190,13 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
RecipientListProcessor rlp = new
RecipientListProcessor(exchange.getContext(), producerCache, iter,
getAggregationStrategy(),
isParallelProcessing(), getExecutorService(),
isShutdownExecutorService(),
isStreaming(), isStopOnException(), getTimeout(),
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
- isStopOnAggregateException()) {
- @Override
- protected synchronized ExecutorService
createAggregateExecutorService(String name) {
- // use a shared executor service to avoid creating new thread
pools
- if (aggregateExecutorService == null) {
- aggregateExecutorService =
super.createAggregateExecutorService("RecipientList-AggregateTask");
- }
- return aggregateExecutorService;
- }
- };
+ isStopOnAggregateException());
+ rlp.setAggregateExecutorService(aggregateExecutorService);
rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
rlp.setId(getId());
rlp.setRouteId(getRouteId());
- // start the service
+ // start ourselves
try {
ServiceHelper.startService(rlp);
} catch (Exception e) {
@@ -232,6 +224,10 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
LOG.debug("RecipientList {} using ProducerCache with
cacheSize={}", this, cacheSize);
}
}
+ if (timeout > 0) {
+ // use a cached thread pool so we each on-the-fly task has a
dedicated thread to process completions as they come in
+ aggregateExecutorService =
camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
"RecipientList-AggregateTask", 0);
+ }
ServiceHelper.startService(aggregationStrategy, producerCache);
}
@@ -244,6 +240,9 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownServices(producerCache,
aggregationStrategy);
+ if (aggregateExecutorService != null) {
+
camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
+ }
if (shutdownExecutorService && executorService != null) {
camelContext.getExecutorServiceManager().shutdownNow(executorService);
}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index 0f09d12..491358c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -81,7 +81,6 @@ public class Splitter extends MulticastProcessor implements
AsyncProcessor, Trac
public boolean process(Exchange exchange, final AsyncCallback callback) {
AggregationStrategy strategy = getAggregationStrategy();
-
// set original exchange if not already pre-configured
if (strategy instanceof UseOriginalAggregationStrategy) {
// need to create a new private instance, as we can also have
concurrency issue so we cannot store state
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 2660405..f4a9575 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Processor;
@@ -52,6 +53,10 @@ public class RecipientListNoCacheTest extends
ContextTestSupport {
Object pc =
ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl);
assertNotNull(pc);
assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+ // and no thread pool is in use as timeout is 0
+ pc =
ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
rl);
+ assertNull(pc);
}
protected void sendBody(String body) {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
index 84458dd..b93b50d 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
@@ -16,11 +16,16 @@
*/
package org.apache.camel.processor;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
import org.apache.camel.AggregationStrategy;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ReflectionHelper;
import org.junit.Test;
public class RecipientListParallelTimeoutTest extends ContextTestSupport {
@@ -34,6 +39,21 @@ public class RecipientListParallelTimeoutTest extends
ContextTestSupport {
template.sendBodyAndHeader("direct:start", "Hello", "slip",
"direct:a,direct:b,direct:c");
assertMockEndpointsSatisfied();
+
+ // make sure that the thread pool will be shutdown
+ List<Processor> list = context.getRoute("route1").filter("foo");
+ RecipientList rl = (RecipientList) list.get(0);
+ assertNotNull(rl);
+
+ Object pc =
ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
rl);
+ assertNotNull(pc);
+ ExecutorService es = assertIsInstanceOf(ExecutorService.class, pc);
+
+ assertFalse(es.isShutdown());
+
+ // now stop camel and ensure the thread pool is stopped
+ context.stop();
+ assertTrue(es.isShutdown());
}
@Override
@@ -51,7 +71,7 @@ public class RecipientListParallelTimeoutTest extends
ContextTestSupport {
oldExchange.getIn().setBody(body +
newExchange.getIn().getBody(String.class));
return oldExchange;
}
- }).parallelProcessing().timeout(500).to("mock:result");
+
}).parallelProcessing().timeout(500).id("foo").to("mock:result");
from("direct:a").delay(1000).setBody(constant("A"));