Author: davsclaus
Date: Sat Apr  9 11:09:28 2011
New Revision: 1090564

URL: http://svn.apache.org/viewvc?rev=1090564&view=rev
Log:
CAMEL-3850: splitter aggregate on-the-fly task is now more responsive in 
parallel mode.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1090564&r1=1090563&r2=1090564&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Sat Apr  9 11:09:28 2011
@@ -258,11 +258,9 @@ public class MulticastProcessor extends 
             // issue task to execute in separate thread so it can aggregate 
on-the-fly
             // while we submit new tasks, and those tasks complete concurrently
             // this allows us to optimize work and reduce memory consumption
-            AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, 
original, total, completion, running,
+            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new 
AggregateOnTheFlyTask(result, original, total, completion, running,
                     aggregationOnTheFlyDone, allTasksSubmitted, 
executionException);
-
-            // and start the aggregation task so we can aggregate on-the-fly
-            aggregateExecutorService.submit(task);
+            final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
 
             LOG.trace("Starting to submit parallel tasks");
 
@@ -273,6 +271,13 @@ public class MulticastProcessor extends 
 
                 completion.submit(new Callable<Exchange>() {
                     public Exchange call() throws Exception {
+                        // only start the aggregation task when the task is 
being executed to avoid staring
+                        // the aggregation task to early and pile up too many 
threads
+                        if (aggregationTaskSubmitted.compareAndSet(false, 
true)) {
+                            // but only submit the task once
+                            
aggregateExecutorService.submit(aggregateOnTheFlyTask);
+                        }
+
                         if (!running.get()) {
                             // do not start processing the task if we are not 
running
                             return subExchange;
@@ -312,7 +317,7 @@ public class MulticastProcessor extends 
             // its to hard to do parallel async routing so we let the caller 
thread be synchronously
             // and have it pickup the replies and do the aggregation (eg we 
use a latch to wait)
             // wait for aggregation to be done
-            LOG.debug("Waiting for on-the-fly aggregation to complete 
aggregating {} responses.", total.get());
+            LOG.debug("Waiting for on-the-fly aggregation to complete 
aggregating {} responses for exchangeId: {}", total.get(), 
original.getExchangeId());
             aggregationOnTheFlyDone.await();
 
             // did we fail for whatever reason, if so throw that caused 
exception
@@ -365,7 +370,7 @@ public class MulticastProcessor extends 
         }
 
         public void run() {
-            LOG.trace("Aggregate on the fly task +++ started +++");
+            LOG.trace("Aggregate on the fly task started for exchangeId: {}", 
original.getExchangeId());
 
             try {
                 aggregateOnTheFly();
@@ -377,8 +382,8 @@ public class MulticastProcessor extends 
                 }
             } finally {
                 // must signal we are done so the latch can open and let the 
other thread continue processing
-                LOG.debug("Signaling we are done aggregating on the fly");
-                LOG.trace("Aggregate on the fly task +++ done +++");
+                LOG.debug("Signaling we are done aggregating on the fly for 
exchangeId: {}", original.getExchangeId());
+                LOG.trace("Aggregate on the fly task done for exchangeId: {}", 
original.getExchangeId());
                 aggregationOnTheFlyDone.countDown();
             }
         }
@@ -436,7 +441,7 @@ public class MulticastProcessor extends 
                         ((TimeoutAwareAggregationStrategy) 
strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
                     } else {
                         // log a WARN we timed out since it will not be 
aggregated and the Exchange will be lost
-                        LOG.warn("Parallel processing timed out after " + 
timeout + " millis for number " + aggregated + ". This task will be cancelled 
and will not be aggregated.");
+                        LOG.warn("Parallel processing timed out after {} 
millis for number {}. This task will be cancelled and will not be aggregated.", 
timeout, aggregated);
                     }
                     LOG.debug("Timeout occurred after {} millis for number {} 
task.", timeout, aggregated);
                     timedOut = true;
@@ -856,9 +861,7 @@ public class MulticastProcessor extends 
         if (isParallelProcessing() && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly 
task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the 
aggregate task may not be able to run
-            // and signal completion during processing, which would lead to a 
dead-lock
-            // keep at least one thread in the pool so we re-use the thread 
avoiding to create new threads because
-            // the pool shrank to zero.
+            // and signal completion during processing, which would lead to 
what would appear as a dead-lock or a slow processing
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
         }
@@ -873,7 +876,8 @@ public class MulticastProcessor extends 
      * @return the thread pool
      */
     protected synchronized ExecutorService 
createAggregateExecutorService(String name) {
-        return camelContext.getExecutorServiceStrategy().newThreadPool(this, 
name, 1, Integer.MAX_VALUE);
+        // use a cached thread pool so we each on-the-fly task has a dedicated 
thread to process completions as they come in
+        return 
camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, name);
     }
 
     protected void doStop() throws Exception {

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java?rev=1090564&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
 Sat Apr  9 11:09:28 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+/**
+ *
+ */
+public class SplitterParallelIssueTest extends ContextTestSupport {
+
+    private int size = 20;
+    private int delay = 100;
+
+    public void testSplitParallel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:end");
+        mock.expectedMessageCount(size);
+        int time = Math.max(10000, size * 2 * delay);
+        mock.setResultWaitTime(time);
+
+        for (int i = 0; i < size; i++) {
+            final int num = i;
+            new Thread(new Runnable() {
+                public void run() {
+                    template.sendBody("direct:start", "" + num);
+                }
+            }).start();
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .log("Start ${body}")
+                        .split(body().tokenize("@"), new 
UseLatestAggregationStrategy()).parallelProcessing().streaming()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                int num = exchange.getIn().getBody(int.class);
+                                final long sleep = (long) (num * delay);
+                                log.info("Sleep for " + sleep + "ms");
+                                Thread.sleep(sleep);
+                            }
+                        })
+                        .end()
+                        .log("End ${body}")
+                        .to("mock:end");
+            }
+        };
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelIssueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to