Author: davsclaus
Date: Sat Jan 8 17:29:00 2011
New Revision: 1056744
URL: http://svn.apache.org/viewvc?rev=1056744&view=rev
Log:
CAMEL-3497: Fixed splitter using too much memory when using parallel mode. Now
it aggregtes on-the-fly which means memory consumption is low and it now runs
much faster overall in performance.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
- copied, changed from r1056651,
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056744&r1=1056743&r2=1056744&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Jan 8 17:29:00 2011
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -58,6 +59,7 @@ import org.apache.camel.util.KeyValueHol
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.concurrent.AtomicException;
import org.apache.camel.util.concurrent.AtomicExchange;
import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
import org.apache.commons.logging.Log;
@@ -143,6 +145,7 @@ public class MulticastProcessor extends
private final boolean streaming;
private final boolean stopOnException;
private final ExecutorService executorService;
+ private ExecutorService aggregationExecutorService;
private final long timeout;
private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers
= new ConcurrentHashMap<PreparedErrorHandler, Processor>();
@@ -229,10 +232,9 @@ public class MulticastProcessor extends
}
protected void doProcessParallel(final Exchange original, final
AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
- final boolean streaming, final
AsyncCallback callback) throws InterruptedException, ExecutionException {
- final CompletionService<Exchange> completion;
- final AtomicBoolean running = new AtomicBoolean(true);
+ final boolean streaming, final
AsyncCallback callback) throws Exception {
+ final CompletionService<Exchange> completion;
if (streaming) {
// execute tasks in parallel+streaming and aggregate in the order
they are finished (out of order sequence)
completion = new
ExecutorCompletionService<Exchange>(executorService);
@@ -241,9 +243,28 @@ public class MulticastProcessor extends
completion = new
SubmitOrderedCompletionService<Exchange>(executorService);
}
+ // when parallel then aggregate on the fly
+ final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger total = new AtomicInteger(0);
+ final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
+ final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
+ final AtomicException executionException = new AtomicException();
final Iterator<ProcessorExchangePair> it = pairs.iterator();
+
+ if (it.hasNext()) {
+ // issue task to execute in separate thread so it can aggregate
on-the-fly
+ // while we submit new tasks, and those tasks complete concurrently
+ // this allows us to optimize work and reduce memory consumption
+ AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result,
original, total, completion, running,
+ aggregationOnTheFlyDone, allTasksSubmitted,
executionException);
+
+ // and start the task using the aggregation execution service
+ aggregationExecutorService.submit(task);
+ }
+
+ LOG.trace("Starting to submit parallel tasks");
+
while (it.hasNext()) {
final ProcessorExchangePair pair = it.next();
final Exchange subExchange = pair.getExchange();
@@ -285,90 +306,181 @@ public class MulticastProcessor extends
total.incrementAndGet();
}
- // TODO: in streaming mode we need to aggregate on-the-fly
+ // signal all tasks has been submitted
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Signaling that all " + total.get() + " tasks has been
submitted.");
+ }
+ allTasksSubmitted.set(true);
- // its to hard to do parallel async routing so we let the caller
thread be synchronously
- // and have it pickup the replies and do the aggregation
- boolean timedOut = false;
- boolean stoppedOnException = false;
- final StopWatch watch = new StopWatch();
- for (int i = 0; i < total.intValue(); i++) {
- Future<Exchange> future;
- if (timedOut) {
- // we are timed out but try to grab if some tasks has been
completed
- // poll will return null if no tasks is present
- future = completion.poll();
- } else if (timeout > 0) {
- long left = timeout - watch.taken();
- if (left < 0) {
- left = 0;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Polling completion task #" + i + " using
timeout " + left + " millis.");
- }
- future = completion.poll(left, TimeUnit.MILLISECONDS);
- } else {
- // take will wait until the task is complete
- if (LOG.isTraceEnabled()) {
- LOG.trace("Polling completion task #" + i);
+ // wait for aggregation to be done
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for on-the-fly aggregation to complete
aggregating " + total.get() + " responses.");
+ }
+ aggregationOnTheFlyDone.await();
+
+ // did we fail for whatever reason, if so throw that caused exception
+ if (executionException.get() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parallel processing failed due " +
executionException.get().getMessage());
+ }
+ throw executionException.get();
+ }
+
+ // no everything is okay so we are done
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done parallel processing " + total + " exchanges");
+ }
+ }
+
+ /**
+ * Task to aggregate on-the-fly for completed tasks when using parallel
processing.
+ * <p/>
+ * This ensures lower memory consumption as we do not need to keep all
completed tasks in memory
+ * before we perform aggregation. Instead this separate thread will run
and aggregate when new
+ * completed tasks is done.
+ * <p/>
+ * The logic is fairly complex as this implementation has to keep track
how far it got, and also
+ * signal back to the <i>main</t> thread when its done, so the <i>main</t>
thread can continue
+ * processing when the entire splitting is done.
+ */
+ private final class AggregateOnTheFlyTask implements Runnable {
+
+ private final AtomicExchange result;
+ private final Exchange original;
+ private final AtomicInteger total;
+ private final CompletionService<Exchange> completion;
+ private final AtomicBoolean running;
+ private final CountDownLatch aggregationOnTheFlyDone;
+ private final AtomicBoolean allTasksSubmitted;
+ private final AtomicException executionException;
+
+ private AggregateOnTheFlyTask(AtomicExchange result, Exchange
original, AtomicInteger total,
+ CompletionService<Exchange> completion,
AtomicBoolean running,
+ CountDownLatch aggregationOnTheFlyDone,
AtomicBoolean allTasksSubmitted,
+ AtomicException executionException) {
+ this.result = result;
+ this.original = original;
+ this.total = total;
+ this.completion = completion;
+ this.running = running;
+ this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
+ this.allTasksSubmitted = allTasksSubmitted;
+ this.executionException = executionException;
+ }
+
+ public void run() {
+ LOG.trace("Aggregate on the fly task +++ started +++");
+
+ try {
+ aggregateOnTheFly();
+ } catch (Throwable e) {
+ if (e instanceof Exception) {
+ executionException.set((Exception) e);
+ } else {
+
executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
}
- future = completion.take();
}
- if (future == null && timedOut) {
- // we are timed out and no more tasks complete so break out
- break;
- } else if (future == null) {
- // timeout occurred
- AggregationStrategy strategy = getAggregationStrategy(null);
- if (strategy instanceof TimeoutAwareAggregationStrategy) {
- // notify the strategy we timed out
- Exchange oldExchange = result.get();
- if (oldExchange == null) {
- // if they all timed out the result may not have been
set yet, so use the original exchange
- oldExchange = original;
+ // must signal we are done so the latch can open and let the other
thread continue processing
+ LOG.trace("Signaling we are done aggregating on the fly");
+ aggregationOnTheFlyDone.countDown();
+
+ LOG.trace("Aggregate on the fly task +++ done +++");
+ }
+
+ private void aggregateOnTheFly() throws InterruptedException,
ExecutionException {
+ // its to hard to do parallel async routing so we let the caller
thread be synchronously
+ // and have it pickup the replies and do the aggregation
+ boolean timedOut = false;
+ boolean stoppedOnException = false;
+ final StopWatch watch = new StopWatch();
+ int aggregated = 0;
+ boolean done = false;
+ // not a for loop as on the fly may still run
+ while (!done) {
+ // check if we have already aggregate everything
+ if (allTasksSubmitted.get() && aggregated >= total.get() ||
stoppedOnException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done aggregating " + aggregated + "
exchanges on the fly.");
+ }
+ break;
+ }
+
+ Future<Exchange> future;
+ if (timedOut) {
+ // we are timed out but try to grab if some tasks has been
completed
+ // poll will return null if no tasks is present
+ future = completion.poll();
+ } else if (timeout > 0) {
+ long left = timeout - watch.taken();
+ if (left < 0) {
+ left = 0;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling completion task #" + aggregated + "
using timeout " + left + " millis.");
}
- ((TimeoutAwareAggregationStrategy)
strategy).timeout(oldExchange, i, total.intValue(), timeout);
+ future = completion.poll(left, TimeUnit.MILLISECONDS);
} else {
- // log a WARN we timed out since it will not be aggregated
and the Exchange will be lost
- LOG.warn("Parallel processing timed out after " + timeout
+ " millis for number " + i + ". This task will be cancelled and will not be
aggregated.");
+ // take will wait until the task is complete
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling completion task #" + aggregated);
+ }
+ future = completion.take();
}
- timedOut = true;
- } else {
- // there is a result to aggregate
- Exchange subExchange = future.get();
- // Decide whether to continue with the multicast or not;
similar logic to the Pipeline
- Integer number = getExchangeIndex(subExchange);
- boolean continueProcessing =
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for
number " + number, LOG);
- if (stopOnException && !continueProcessing) {
- // we want to stop on exception and an exception or
failure occurred
- // this is similar to what the pipeline does, so we should
do the same to not surprise end users
- // so we should set the failed exchange as the result and
break out
- result.set(subExchange);
- stoppedOnException = true;
+ if (future == null && timedOut) {
+ // we are timed out and no more tasks complete so break out
break;
+ } else if (future == null) {
+ // timeout occurred
+ AggregationStrategy strategy =
getAggregationStrategy(null);
+ if (strategy instanceof TimeoutAwareAggregationStrategy) {
+ // notify the strategy we timed out
+ Exchange oldExchange = result.get();
+ if (oldExchange == null) {
+ // if they all timed out the result may not have
been set yet, so use the original exchange
+ oldExchange = original;
+ }
+ ((TimeoutAwareAggregationStrategy)
strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
+ } else {
+ // log a WARN we timed out since it will not be
aggregated and the Exchange will be lost
+ LOG.warn("Parallel processing timed out after " +
timeout + " millis for number " + aggregated + ". This task will be cancelled
and will not be aggregated.");
+ }
+ timedOut = true;
+ } else {
+ // there is a result to aggregate
+ Exchange subExchange = future.get();
+
+ // Decide whether to continue with the multicast or not;
similar logic to the Pipeline
+ Integer number = getExchangeIndex(subExchange);
+ boolean continueProcessing =
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for
number " + number, LOG);
+ if (stopOnException && !continueProcessing) {
+ // we want to stop on exception and an exception or
failure occurred
+ // this is similar to what the pipeline does, so we
should do the same to not surprise end users
+ // so we should set the failed exchange as the result
and break out
+ result.set(subExchange);
+ stoppedOnException = true;
+ break;
+ }
+
+ // we got a result so aggregate it
+ AggregationStrategy strategy =
getAggregationStrategy(subExchange);
+ doAggregate(strategy, result, subExchange);
}
- // we got a result so aggregate it
- AggregationStrategy strategy =
getAggregationStrategy(subExchange);
- doAggregate(strategy, result, subExchange);
+ aggregated++;
}
- }
- if (timedOut || stoppedOnException) {
- if (timedOut && LOG.isDebugEnabled()) {
- LOG.debug("Cancelling tasks due timeout after " + timeout + "
millis.");
- }
- if (stoppedOnException && LOG.isDebugEnabled()) {
- LOG.debug("Cancelling tasks due stopOnException.");
+ if (timedOut || stoppedOnException) {
+ if (timedOut && LOG.isDebugEnabled()) {
+ LOG.debug("Cancelling tasks due timeout after " + timeout
+ " millis.");
+ }
+ if (stoppedOnException && LOG.isDebugEnabled()) {
+ LOG.debug("Cancelling tasks due stopOnException.");
+ }
+ // cancel tasks as we timed out (its safe to cancel done tasks)
+ running.set(false);
}
- // cancel tasks as we timed out (its safe to cancel done tasks)
- running.set(false);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done parallel processing " + total + " exchanges");
}
}
@@ -754,6 +866,9 @@ public class MulticastProcessor extends
if (timeout > 0 && !isParallelProcessing()) {
throw new IllegalArgumentException("Timeout is used but
ParallelProcessing has not been enabled");
}
+ if (isParallelProcessing()) {
+ aggregationExecutorService =
getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this,
"AggregationTask");
+ }
ServiceHelper.startServices(processors);
}
Copied:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
(from r1056651,
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java&r1=1056651&r2=1056744&rev=1056744&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
Sat Jan 8 17:29:00 2011
@@ -18,12 +18,10 @@ package org.apache.camel.util.concurrent
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.camel.Exchange;
-
/**
- * Convenience class for holding an {...@link Exchange} in a thread-safe way
+ * Convenience class for holding an {...@link Exception} in a thread-safe way
*/
@SuppressWarnings("serial")
-public class AtomicExchange extends AtomicReference<Exchange> {
+public class AtomicException extends AtomicReference<Exception> {
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java?rev=1056744&r1=1056743&r2=1056744&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
Sat Jan 8 17:29:00 2011
@@ -84,7 +84,9 @@ public class SplitterParallelBigFileTest
from("file:target/split")
.split(body().tokenize("\n")).streaming().parallelProcessing()
- .to("log:split?groupSize=1000");
+ .to("log:split?groupSize=1000")
+ .end()
+ .log("Done splitting ${file:name}");
}
};
}