Author: davsclaus
Date: Sat Apr 9 11:09:28 2011
New Revision: 1090564
URL: http://svn.apache.org/viewvc?rev=1090564&view=rev
Log:
CAMEL-3850: splitter aggregate on-the-fly task is now more responsive in
parallel mode.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1090564&r1=1090563&r2=1090564&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Apr 9 11:09:28 2011
@@ -258,11 +258,9 @@ public class MulticastProcessor extends
// issue task to execute in separate thread so it can aggregate
on-the-fly
// while we submit new tasks, and those tasks complete concurrently
// this allows us to optimize work and reduce memory consumption
- AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result,
original, total, completion, running,
+ final AggregateOnTheFlyTask aggregateOnTheFlyTask = new
AggregateOnTheFlyTask(result, original, total, completion, running,
aggregationOnTheFlyDone, allTasksSubmitted,
executionException);
-
- // and start the aggregation task so we can aggregate on-the-fly
- aggregateExecutorService.submit(task);
+ final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
LOG.trace("Starting to submit parallel tasks");
@@ -273,6 +271,13 @@ public class MulticastProcessor extends
completion.submit(new Callable<Exchange>() {
public Exchange call() throws Exception {
+ // only start the aggregation task when the task is
being executed to avoid staring
+ // the aggregation task to early and pile up too many
threads
+ if (aggregationTaskSubmitted.compareAndSet(false,
true)) {
+ // but only submit the task once
+
aggregateExecutorService.submit(aggregateOnTheFlyTask);
+ }
+
if (!running.get()) {
// do not start processing the task if we are not
running
return subExchange;
@@ -312,7 +317,7 @@ public class MulticastProcessor extends
// its to hard to do parallel async routing so we let the caller
thread be synchronously
// and have it pickup the replies and do the aggregation (eg we
use a latch to wait)
// wait for aggregation to be done
- LOG.debug("Waiting for on-the-fly aggregation to complete
aggregating {} responses.", total.get());
+ LOG.debug("Waiting for on-the-fly aggregation to complete
aggregating {} responses for exchangeId: {}", total.get(),
original.getExchangeId());
aggregationOnTheFlyDone.await();
// did we fail for whatever reason, if so throw that caused
exception
@@ -365,7 +370,7 @@ public class MulticastProcessor extends
}
public void run() {
- LOG.trace("Aggregate on the fly task +++ started +++");
+ LOG.trace("Aggregate on the fly task started for exchangeId: {}",
original.getExchangeId());
try {
aggregateOnTheFly();
@@ -377,8 +382,8 @@ public class MulticastProcessor extends
}
} finally {
// must signal we are done so the latch can open and let the
other thread continue processing
- LOG.debug("Signaling we are done aggregating on the fly");
- LOG.trace("Aggregate on the fly task +++ done +++");
+ LOG.debug("Signaling we are done aggregating on the fly for
exchangeId: {}", original.getExchangeId());
+ LOG.trace("Aggregate on the fly task done for exchangeId: {}",
original.getExchangeId());
aggregationOnTheFlyDone.countDown();
}
}
@@ -436,7 +441,7 @@ public class MulticastProcessor extends
((TimeoutAwareAggregationStrategy)
strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
} else {
// log a WARN we timed out since it will not be
aggregated and the Exchange will be lost
- LOG.warn("Parallel processing timed out after " +
timeout + " millis for number " + aggregated + ". This task will be cancelled
and will not be aggregated.");
+ LOG.warn("Parallel processing timed out after {}
millis for number {}. This task will be cancelled and will not be aggregated.",
timeout, aggregated);
}
LOG.debug("Timeout occurred after {} millis for number {}
task.", timeout, aggregated);
timedOut = true;
@@ -856,9 +861,7 @@ public class MulticastProcessor extends
if (isParallelProcessing() && aggregateExecutorService == null) {
// use unbounded thread pool so we ensure the aggregate on-the-fly
task always will have assigned a thread
// and run the tasks when the task is submitted. If not then the
aggregate task may not be able to run
- // and signal completion during processing, which would lead to a
dead-lock
- // keep at least one thread in the pool so we re-use the thread
avoiding to create new threads because
- // the pool shrank to zero.
+ // and signal completion during processing, which would lead to
what would appear as a dead-lock or a slow processing
String name = getClass().getSimpleName() + "-AggregateTask";
aggregateExecutorService = createAggregateExecutorService(name);
}
@@ -873,7 +876,8 @@ public class MulticastProcessor extends
* @return the thread pool
*/
protected synchronized ExecutorService
createAggregateExecutorService(String name) {
- return camelContext.getExecutorServiceStrategy().newThreadPool(this,
name, 1, Integer.MAX_VALUE);
+ // use a cached thread pool so we each on-the-fly task has a dedicated
thread to process completions as they come in
+ return
camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, name);
}
protected void doStop() throws Exception {
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java?rev=1090564&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
Sat Apr 9 11:09:28 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+/**
+ *
+ */
+public class SplitterParallelIssueTest extends ContextTestSupport {
+
+ private int size = 20;
+ private int delay = 100;
+
+ public void testSplitParallel() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:end");
+ mock.expectedMessageCount(size);
+ int time = Math.max(10000, size * 2 * delay);
+ mock.setResultWaitTime(time);
+
+ for (int i = 0; i < size; i++) {
+ final int num = i;
+ new Thread(new Runnable() {
+ public void run() {
+ template.sendBody("direct:start", "" + num);
+ }
+ }).start();
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .log("Start ${body}")
+ .split(body().tokenize("@"), new
UseLatestAggregationStrategy()).parallelProcessing().streaming()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws
Exception {
+ int num = exchange.getIn().getBody(int.class);
+ final long sleep = (long) (num * delay);
+ log.info("Sleep for " + sleep + "ms");
+ Thread.sleep(sleep);
+ }
+ })
+ .end()
+ .log("End ${body}")
+ .to("mock:end");
+ }
+ };
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date