Repository: camel Updated Branches: refs/heads/master 509f4ea5d -> 4ffc5e258
CAMEL-7581: Added aggregateOnException option to enrich/pollEnrich so end users can handle exception in their aggregte method. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16e193c8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16e193c8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16e193c8 Branch: refs/heads/master Commit: 16e193c85a68e1c61be9eb0354eb6d962a31306c Parents: 509f4ea Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Sep 1 10:28:57 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Sep 1 10:28:57 2014 +0200 ---------------------------------------------------------------------- .../apache/camel/model/EnrichDefinition.java | 13 ++ .../camel/model/PollEnrichDefinition.java | 13 ++ .../apache/camel/model/ProcessorDefinition.java | 100 +++++++++++++++ .../org/apache/camel/processor/Enricher.java | 21 +++- .../apache/camel/processor/PollEnricher.java | 29 ++++- .../EnricherAggregateOnExceptionTest.java | 116 ++++++++++++++++++ .../PollEnricherAggregateOnExceptionTest.java | 122 +++++++++++++++++++ 7 files changed, 406 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 64935bb..789f84b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -50,6 +50,8 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple private String aggregationStrategyMethodName; @XmlAttribute(name = "strategyMethodAllowNull") private Boolean aggregationStrategyMethodAllowNull; + @XmlAttribute + private Boolean aggregateOnException; @XmlTransient private AggregationStrategy aggregationStrategy; @@ -115,6 +117,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple } else { enricher.setAggregationStrategy(strategy); } + if (getAggregateOnException() != null) { + enricher.setAggregateOnException(getAggregateOnException()); + } return enricher; } @@ -190,4 +195,12 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } + + public Boolean getAggregateOnException() { + return aggregateOnException; + } + + public void setAggregateOnException(Boolean aggregateOnException) { + this.aggregateOnException = aggregateOnException; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index 3ddec0c..ac5a323 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -52,6 +52,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio private String aggregationStrategyMethodName; @XmlAttribute(name = "strategyMethodAllowNull") private Boolean aggregationStrategyMethodAllowNull; + @XmlAttribute + private Boolean aggregateOnException; @XmlTransient private AggregationStrategy aggregationStrategy; @@ -120,6 +122,9 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio } else { enricher.setAggregationStrategy(strategy); } + if (getAggregateOnException() != null) { + enricher.setAggregateOnException(getAggregateOnException()); + } return enricher; } @@ -204,4 +209,12 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } + + public Boolean getAggregateOnException() { + return aggregateOnException; + } + + public void setAggregateOnException(Boolean aggregateOnException) { + this.aggregateOnException = aggregateOnException; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 6af6ca6..af44b18 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -2920,6 +2920,23 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + @SuppressWarnings("unchecked") + public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { + EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri); + enrich.setAggregateOnException(aggregateOnException); + addOutput(enrich); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * <p/> * The difference between this and {@link #pollEnrich(String)} is that this uses a producer * to obatin the additional data, where as pollEnrich uses a polling consumer. @@ -2957,6 +2974,30 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * <p/> + * The difference between this and {@link #pollEnrich(String)} is that this uses a producer + * to obtain the additional data, where as pollEnrich uses a polling consumer. + * + * @param resourceRef Reference of resource endpoint for obtaining additional data. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + @SuppressWarnings("unchecked") + public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) { + EnrichDefinition enrich = new EnrichDefinition(); + enrich.setResourceRef(resourceRef); + enrich.setAggregationStrategyRef(aggregationStrategyRef); + enrich.setAggregateOnException(aggregateOnException); + addOutput(enrich); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code> * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. * <p/> @@ -3036,6 +3077,34 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @param resourceUri URI of resource endpoint for obtaining additional data. * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + @SuppressWarnings("unchecked") + public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { + PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout); + pollEnrich.setAggregateOnException(aggregateOnException); + addOutput(pollEnrich); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. * @return the builder * @see org.apache.camel.processor.PollEnricher */ @@ -3074,6 +3143,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param resourceRef Reference of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + @SuppressWarnings("unchecked") + public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { + PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); + pollEnrich.setResourceRef(resourceRef); + pollEnrich.setTimeout(timeout); + pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); + pollEnrich.setAggregateOnException(aggregateOnException); + addOutput(pollEnrich); + return (Type) this; + } + + /** * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as * a callback when the {@link org.apache.camel.Exchange} has finished being processed. * The hook invoke callbacks for either onComplete or onFailure. http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index 72faa12..9ab0aa9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -53,6 +53,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(Enricher.class); private AggregationStrategy aggregationStrategy; private Producer producer; + private boolean aggregateOnException; /** * Creates a new {@link Enricher}. The default aggregation strategy is to @@ -86,6 +87,22 @@ public class Enricher extends ServiceSupport implements AsyncProcessor { this.aggregationStrategy = aggregationStrategy; } + public AggregationStrategy getAggregationStrategy() { + return aggregationStrategy; + } + + public boolean isAggregateOnException() { + return aggregateOnException; + } + + /** + * Whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + */ + public void setAggregateOnException(boolean aggregateOnException) { + this.aggregateOnException = aggregateOnException; + } + /** * Sets the default aggregation strategy for this enricher. */ @@ -128,7 +145,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor { long timeTaken = watch.stop(); EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); - if (resourceExchange.isFailed()) { + if (!isAggregateOnException() && resourceExchange.isFailed()) { // copy resource exchange onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, resourceExchange); } else { @@ -171,7 +188,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor { long timeTaken = watch.stop(); EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken); - if (resourceExchange.isFailed()) { + if (!isAggregateOnException() && resourceExchange.isFailed()) { // copy resource exchange onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, resourceExchange); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 34d445a..7960d3d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -49,6 +49,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { private AggregationStrategy aggregationStrategy; private PollingConsumer consumer; private long timeout; + private boolean aggregateOnException; /** * Creates a new {@link PollEnricher}. The default aggregation strategy is to @@ -75,6 +76,10 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { this.timeout = timeout; } + public AggregationStrategy getAggregationStrategy() { + return aggregationStrategy; + } + /** * Sets the aggregation strategy for this poll enricher. * @@ -84,11 +89,8 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { this.aggregationStrategy = aggregationStrategy; } - /** - * Sets the default aggregation strategy for this poll enricher. - */ - public void setDefaultAggregationStrategy() { - this.aggregationStrategy = defaultAggregationStrategy(); + public long getTimeout() { + return timeout; } /** @@ -103,6 +105,21 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { this.timeout = timeout; } + public boolean isAggregateOnException() { + return aggregateOnException; + } + + public void setAggregateOnException(boolean aggregateOnException) { + this.aggregateOnException = aggregateOnException; + } + + /** + * Sets the default aggregation strategy for this poll enricher. + */ + public void setDefaultAggregationStrategy() { + this.aggregationStrategy = defaultAggregationStrategy(); + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -147,7 +164,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor { LOG.debug("Consumer received: {}", resourceExchange); } - if (resourceExchange != null && resourceExchange.isFailed()) { + if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) { // copy resource exchange onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, resourceExchange); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateOnExceptionTest.java new file mode 100644 index 0000000..4353a66 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherAggregateOnExceptionTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version + */ +public class EnricherAggregateOnExceptionTest extends ContextTestSupport { + + public void testEnrichTrueOk() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "World"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichTrueKaboom() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("I cannot do this"); + + template.sendBody("direct:start", "Kaboom"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichFalseOk() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start2", "World"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichFalseKaboom() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + try { + template.sendBody("direct:start2", "Kaboom"); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("I cannot do this", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrich("direct:foo", new MyAggregationStrategy(), true) + .to("mock:result"); + + from("direct:start2") + .enrich("direct:foo", new MyAggregationStrategy(), false) + .to("mock:result"); + + from("direct:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + if (body.startsWith("Kaboom")) { + throw new IllegalArgumentException("I cannot do this"); + } + exchange.getIn().setBody("Hello " + body); + } + }); + } + }; + } + + private class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (newExchange.getException() != null) { + oldExchange.getIn().setBody(newExchange.getException().getMessage()); + return oldExchange; + } + + // replace body + oldExchange.getIn().setBody(newExchange.getIn().getBody()); + return oldExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/16e193c8/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherAggregateOnExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherAggregateOnExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherAggregateOnExceptionTest.java new file mode 100644 index 0000000..a53ce27 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherAggregateOnExceptionTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version + */ +public class PollEnricherAggregateOnExceptionTest extends ContextTestSupport { + + public void testEnrichTrueOk() throws Exception { + template.sendBody("seda:foo", "Hello World"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "World"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichTrueKaboom() throws Exception { + template.send("seda:foo", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new IllegalArgumentException("I cannot do this")); + } + }); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("I cannot do this"); + + template.sendBody("direct:start", "Kaboom"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichFalseOk() throws Exception { + template.sendBody("seda:foo", "Hello World"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start2", "World"); + + assertMockEndpointsSatisfied(); + } + + public void testEnrichFalseKaboom() throws Exception { + template.send("seda:foo", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new IllegalArgumentException("I cannot do this")); + } + }); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + + try { + template.sendBody("direct:start2", "Kaboom"); + fail("Should have thrown exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("I cannot do this", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pollEnrich("seda:foo", 5000, new MyAggregationStrategy(), true) + .to("mock:result"); + + from("direct:start2") + .pollEnrich("seda:foo", 5000, new MyAggregationStrategy(), false) + .to("mock:result"); + } + }; + } + + private class MyAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (newExchange.getException() != null) { + oldExchange.getIn().setBody(newExchange.getException().getMessage()); + return oldExchange; + } + + // replace body + oldExchange.getIn().setBody(newExchange.getIn().getBody()); + return oldExchange; + } + } +}