Author: davsclaus Date: Thu Jul 1 09:36:35 2010 New Revision: 959570 URL: http://svn.apache.org/viewvc?rev=959570&view=rev Log: CAMEL-2887: Enricher DSL support async routing engine.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java - copied, changed from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=959570&r1=959569&r2=959570&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Thu Jul 1 09:36:35 2010 @@ -16,15 +16,20 @@ */ package org.apache.camel.processor; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; @@ -40,8 +45,9 @@ import static org.apache.camel.util.Exch * * @see PollEnricher */ -public class Enricher extends ServiceSupport implements Processor { +public class Enricher extends ServiceSupport implements AsyncProcessor { + private static final transient Log LOG = LogFactory.getLog(Enricher.class); private AggregationStrategy aggregationStrategy; private Producer producer; @@ -84,6 +90,10 @@ public class Enricher extends ServiceSup this.aggregationStrategy = defaultAggregationStrategy(); } + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + /** * Enriches the input data (<code>exchange</code>) by first obtaining * additional data from an endpoint represented by an endpoint @@ -93,12 +103,51 @@ public class Enricher extends ServiceSup * message exchange with the resource endpoint fails then no aggregation * will be done and the failed exchange content is copied over to the * original message exchange. - * + * * @param exchange input data. */ - public void process(Exchange exchange) throws Exception { - Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); - producer.process(resourceExchange); + public boolean process(final Exchange exchange, final AsyncCallback callback) { + final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); + + AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer); + boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() { + public void done(boolean doneSync) { + // we only have to handle async completion of the routing slip + if (doneSync) { + return; + } + + if (resourceExchange.isFailed()) { + // copy resource exchange onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, resourceExchange); + } else { + prepareResult(exchange); + + // prepare the exchanges for aggregation + ExchangeHelper.prepareAggregation(exchange, resourceExchange); + Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); + if (aggregatedExchange != null) { + // copy aggregation result onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, aggregatedExchange); + } + } + + callback.done(false); + } + }); + + if (!sync) { + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed asynchronously"); + } + // the remainder of the routing slip will be completed async + // so we break out now, then the callback will be invoked which then continue routing from where we left here + return false; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " is continued being processed synchronously"); + } if (resourceExchange.isFailed()) { // copy resource exchange onto original exchange (preserving pattern) @@ -114,6 +163,9 @@ public class Enricher extends ServiceSup copyResultsPreservePattern(exchange, aggregatedExchange); } } + + callback.done(true); + return true; } /** Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java (from r959564, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=959564&r2=959570&rev=959570&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java Thu Jul 1 09:36:35 2010 @@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui /** * @version $Revision$ */ -public class AsyncEndpointTest extends ContextTestSupport { +public class AsyncEndpointEnricherTest extends ContextTestSupport { private static String beforeThreadName; private static String afterThreadName; @@ -57,7 +57,7 @@ public class AsyncEndpointTest extends C beforeThreadName = Thread.currentThread().getName(); } }) - .to("async:Bye Camel") + .enrich("async:Bye Camel") .process(new Processor() { public void process(Exchange exchange) throws Exception { afterThreadName = Thread.currentThread().getName(); @@ -70,4 +70,4 @@ public class AsyncEndpointTest extends C }; } -} +} \ No newline at end of file