Repository: camel Updated Branches: refs/heads/master d14fa8c45 -> 01dead92e
CAMEL-8965: WireTap supports dynamic uris like enrich/pollEnrich now does. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2696b57b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2696b57b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2696b57b Branch: refs/heads/master Commit: 2696b57b831712548d10be1651e053685d6e25da Parents: d14fa8c Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jul 20 16:18:35 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jul 20 22:54:07 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/model/ProcessorDefinition.java | 88 ++++++++----- .../apache/camel/model/WireTapDefinition.java | 125 +++---------------- .../camel/processor/WireTapProcessor.java | 42 +++---- ...outeRemoveWireTapExplicitThreadPoolTest.java | 2 +- .../camel/processor/CBRWithWireTapTest.java | 4 +- .../camel/processor/WireTapCustomPool2Test.java | 2 +- .../camel/processor/WireTapNewExchangeTest.java | 2 +- .../processor/WireTapOnPrepareRefTest.java | 2 +- .../camel/processor/WireTapOnPrepareTest.java | 2 +- ...eTapUsingFireAndForgetCopyAsDefaultTest.java | 4 +- .../WireTapUsingFireAndForgetCopyTest.java | 4 +- .../FileWireTapWithXMLPayloadIssueTest.xml | 4 +- .../SpringAggregateFromWireTapTest.xml | 4 +- .../processor/SpringComplexBlockWithEndTest.xml | 4 +- .../processor/SpringWireTapNewExchangeTest.xml | 3 +- .../spring/processor/SpringWireTapTest.xml | 10 +- .../SpringWireTapUsingFireAndForgetCopyTest.xml | 7 +- .../SpringWireTapUsingFireAndForgetTest.xml | 7 +- .../spring/processor/WireTapOnPrepareTest.xml | 4 +- 19 files changed, 133 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 17ccb0e..91d0e6f 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -1471,6 +1471,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> addOutput(answer); return ExpressionClause.createAndSetExpression(answer); } + /** * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a> * Creates a loadbalance @@ -2255,14 +2256,28 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * destination gets a copy of the original message to avoid the processors * interfering with each other using {@link ExchangePattern#InOnly}. * + * @return the builder + */ + public ExpressionClause<WireTapDefinition> wireTap() { + WireTapDefinition answer = new WireTapDefinition(); + addOutput(answer); + return ExpressionClause.createAndSetExpression(answer); + } + + /** + * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> + * Sends messages to all its child outputs; so that each processor and + * destination gets a copy of the original message to avoid the processors + * interfering with each other using {@link ExchangePattern#InOnly}. + * * @param uri the destination * @return the builder */ - public WireTapDefinition<Type> wireTap(String uri) { - WireTapDefinition<Type> answer = new WireTapDefinition<Type>(); - answer.setUri(uri); + public Type wireTap(String uri) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); addOutput(answer); - return answer; + return (Type) this; } /** @@ -2275,15 +2290,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param executorService a custom {@link ExecutorService} to use as thread pool * for sending tapped exchanges * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, ExecutorService executorService) { - WireTapDefinition<Type> answer = new WireTapDefinition<Type>(); - answer.setUri(uri); + public Type wireTap(String uri, ExecutorService executorService) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); answer.setExecutorService(executorService); addOutput(answer); - return answer; + return (Type) this; } /** @@ -2296,15 +2309,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param executorServiceRef reference to lookup a custom {@link ExecutorService} * to use as thread pool for sending tapped exchanges * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, String executorServiceRef) { - WireTapDefinition<Type> answer = new WireTapDefinition<Type>(); - answer.setUri(uri); + public Type wireTap(String uri, String executorServiceRef) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); answer.setExecutorServiceRef(executorServiceRef); addOutput(answer); - return answer; + return (Type) this; } /** @@ -2318,10 +2329,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param uri the destination * @param body expression that creates the body to send * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, Expression body) { + public Type wireTap(String uri, Expression body) { return wireTap(uri, true, body); } @@ -2332,18 +2341,33 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @param uri the destination * @param copy whether or not use a copy of the original exchange or a new empty exchange + * @return the builder + */ + public Type wireTap(String uri, boolean copy) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); + answer.setCopy(copy); + addOutput(answer); + return (Type) this; + } + + /** + * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> + * Sends a new {@link org.apache.camel.Exchange} to the destination + * using {@link ExchangePattern#InOnly}. + * + * @param uri the destination + * @param copy whether or not use a copy of the original exchange or a new empty exchange * @param body expression that creates the body to send * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, boolean copy, Expression body) { - WireTapDefinition<Type> answer = new WireTapDefinition<Type>(); - answer.setUri(uri); + public Type wireTap(String uri, boolean copy, Expression body) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); answer.setCopy(copy); answer.setNewExchangeExpression(body); addOutput(answer); - return answer; + return (Type) this; } /** @@ -2357,10 +2381,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param uri the destination * @param processor processor preparing the new exchange to send * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, Processor processor) { + public Type wireTap(String uri, Processor processor) { return wireTap(uri, true, processor); } @@ -2373,16 +2395,14 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param copy whether or not use a copy of the original exchange or a new empty exchange * @param processor processor preparing the new exchange to send * @return the builder - * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0 */ - @Deprecated - public WireTapDefinition<Type> wireTap(String uri, boolean copy, Processor processor) { - WireTapDefinition<Type> answer = new WireTapDefinition<Type>(); - answer.setUri(uri); + public Type wireTap(String uri, boolean copy, Processor processor) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setExpression(new SimpleExpression(uri)); answer.setCopy(copy); answer.setNewExchangeProcessor(processor); addOutput(answer); - return answer; + return (Type) this; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java index 49631bb..ebe95b1 100644 --- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -27,12 +27,11 @@ import javax.xml.bind.annotation.XmlElementRef; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; -import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.Processor; -import org.apache.camel.Producer; import org.apache.camel.processor.CamelInternalProcessor; +import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.WireTapProcessor; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; @@ -44,15 +43,7 @@ import org.apache.camel.util.CamelContextHelper; @Metadata(label = "eip,endpoint,routing") @XmlRootElement(name = "wireTap") @XmlAccessorType(XmlAccessType.FIELD) -public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputDefinition<WireTapDefinition<Type>> - implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>>, EndpointRequiredDefinition { - @XmlAttribute - protected String uri; - @XmlAttribute - @Deprecated - protected String ref; - @XmlTransient - protected Endpoint endpoint; +public class WireTapDefinition extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<WireTapDefinition> { @XmlTransient private Processor newExchangeProcessor; @XmlAttribute(name = "processorRef") @@ -75,37 +66,17 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N public WireTapDefinition() { } - public WireTapDefinition(String uri) { - setUri(uri); - } - - public WireTapDefinition(Endpoint endpoint) { - setEndpoint(endpoint); - } - - @Override - public String getEndpointUri() { - if (uri != null) { - return uri; - } else if (endpoint != null) { - return endpoint.getEndpointUri(); - } else { - return null; - } - } - @Override public Processor createProcessor(RouteContext routeContext) throws Exception { // executor service is mandatory for wire tap boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true); - // create the producer to send to the wire tapped endpoint - Endpoint endpoint = resolveEndpoint(routeContext); - Producer producer = endpoint.createProducer(); + // create the send dynamic producer to send to the wire tapped endpoint + Processor dynamicTo = new SendDynamicProcessor(null, getExpression()); // create error handler we need to use for processing the wire tapped - Processor target = wrapInErrorHandler(routeContext, producer); + Processor target = wrapInErrorHandler(routeContext, dynamicTo); // and wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(target); @@ -114,7 +85,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N // is true bt default boolean isCopy = getCopy() == null || getCopy(); - WireTapProcessor answer = new WireTapProcessor(endpoint, internal, getPattern(), threadPool, shutdownThreadPool); + WireTapProcessor answer = new WireTapProcessor(getExpression(), internal, getPattern(), threadPool, shutdownThreadPool); answer.setCopy(isCopy); if (newExchangeProcessorRef != null) { newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class); @@ -147,37 +118,12 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N @Override public String toString() { - return "WireTap[" + description() + "]"; + return "WireTap[" + getExpression() + "]"; } - protected String description() { - return FromDefinition.description(getUri(), getRef(), getEndpoint()); - } - @Override public String getLabel() { - return "wireTap[" + description() + "]"; - } - - @Override - @SuppressWarnings("unchecked") - public Type end() { - // allow end() to return to previous type so you can continue in the DSL - return (Type) super.end(); - } - - @Override - public void addOutput(ProcessorDefinition<?> output) { - // add outputs on parent as this wiretap does not support outputs - getParent().addOutput(output); - } - - public Endpoint resolveEndpoint(RouteContext context) { - if (endpoint == null) { - return context.resolveEndpoint(getUri(), getRef()); - } else { - return endpoint; - } + return "wireTap[" + getExpression() + "]"; } // Fluent API @@ -190,7 +136,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * for sending tapped exchanges * @return the builder */ - public WireTapDefinition<Type> executorService(ExecutorService executorService) { + public WireTapDefinition executorService(ExecutorService executorService) { setExecutorService(executorService); return this; } @@ -202,7 +148,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * to use as thread pool for sending tapped exchanges * @return the builder */ - public WireTapDefinition<Type> executorServiceRef(String executorServiceRef) { + public WireTapDefinition executorServiceRef(String executorServiceRef) { setExecutorServiceRef(executorServiceRef); return this; } @@ -212,7 +158,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * * @return the builder */ - public WireTapDefinition<Type> copy() { + public WireTapDefinition copy() { setCopy(true); return this; } @@ -224,7 +170,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * if it is false camel will not copy the original exchange * @return the builder */ - public WireTapDefinition<Type> copy(boolean copy) { + public WireTapDefinition copy(boolean copy) { setCopy(copy); return this; } @@ -233,7 +179,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @deprecated will be removed in Camel 3.0 Instead use {@link #newExchangeBody(org.apache.camel.Expression)} */ @Deprecated - public WireTapDefinition<Type> newExchange(Expression expression) { + public WireTapDefinition newExchange(Expression expression) { return newExchangeBody(expression); } @@ -244,7 +190,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @return the builder * @see #newExchangeHeader(String, org.apache.camel.Expression) */ - public WireTapDefinition<Type> newExchangeBody(Expression expression) { + public WireTapDefinition newExchangeBody(Expression expression) { setNewExchangeExpression(expression); return this; } @@ -256,7 +202,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * be used for preparing the new exchange to send * @return the builder */ - public WireTapDefinition<Type> newExchangeRef(String ref) { + public WireTapDefinition newExchangeRef(String ref) { setNewExchangeProcessorRef(ref); return this; } @@ -268,7 +214,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @return the builder * @see #newExchangeHeader(String, org.apache.camel.Expression) */ - public WireTapDefinition<Type> newExchange(Processor processor) { + public WireTapDefinition newExchange(Processor processor) { setNewExchangeProcessor(processor); return this; } @@ -283,7 +229,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @param expression the expression setting the header value * @return the builder */ - public WireTapDefinition<Type> newExchangeHeader(String headerName, Expression expression) { + public WireTapDefinition newExchangeHeader(String headerName, Expression expression) { headers.add(new SetHeaderDefinition(headerName, expression)); return this; } @@ -296,7 +242,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @param onPrepare the processor * @return the builder */ - public WireTapDefinition<Type> onPrepare(Processor onPrepare) { + public WireTapDefinition onPrepare(Processor onPrepare) { setOnPrepare(onPrepare); return this; } @@ -309,44 +255,11 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} * @return the builder */ - public WireTapDefinition<Type> onPrepareRef(String onPrepareRef) { + public WireTapDefinition onPrepareRef(String onPrepareRef) { setOnPrepareRef(onPrepareRef); return this; } - public String getUri() { - return uri; - } - - /** - * Uri of the endpoint to use as wire tap - */ - public void setUri(String uri) { - this.uri = uri; - } - - public String getRef() { - return ref; - } - - /** - * Reference of the endpoint to use as wire tap - * - * @deprecated use uri with ref:uri instead - */ - @Deprecated - public void setRef(String ref) { - this.ref = ref; - } - - public Endpoint getEndpoint() { - return endpoint; - } - - public void setEndpoint(Endpoint endpoint) { - this.endpoint = endpoint; - } - public Processor getNewExchangeProcessor() { return newExchangeProcessor; } http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 1d6b835..87a3365 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -24,8 +24,8 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; -import org.apache.camel.Endpoint; -import org.apache.camel.EndpointAware; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; @@ -37,11 +37,9 @@ import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; -import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,10 +48,11 @@ import org.slf4j.LoggerFactory; * * @version */ -public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, EndpointAware, IdAware { +public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class); private String id; - private final Endpoint destination; + private CamelContext camelContext; + private final Expression expression; private final Processor processor; private final ExchangePattern exchangePattern; private final ExecutorService executorService; @@ -66,9 +65,9 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, private boolean copy; private Processor onPrepare; - public WireTapProcessor(Endpoint destination, Processor processor, ExchangePattern exchangePattern, + public WireTapProcessor(Expression expression, Processor processor, ExchangePattern exchangePattern, ExecutorService executorService, boolean shutdownExecutorService) { - this.destination = destination; + this.expression = expression; this.processor = processor; this.exchangePattern = exchangePattern; ObjectHelper.notNull(executorService, "executorService"); @@ -78,12 +77,12 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, @Override public String toString() { - return "WireTap[" + destination + "]"; + return "WireTap[" + expression + "]"; } @Override public String getTraceLabel() { - return "wireTap(" + destination + ")"; + return "wireTap(" + expression + ")"; } public String getId() { @@ -94,8 +93,12 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, this.id = id; } - public Endpoint getEndpoint() { - return destination; + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; } public void process(Exchange exchange) throws Exception { @@ -122,17 +125,11 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, // send the exchange to the destination using an executor service executorService.submit(new Callable<Exchange>() { public Exchange call() throws Exception { - final StopWatch watch = new StopWatch(); try { - EventHelper.notifyExchangeSending(wireTapExchange.getContext(), wireTapExchange, destination); - LOG.debug(">>>> (wiretap) {} {}", destination, wireTapExchange); + LOG.debug(">>>> (wiretap) {} {}", expression, wireTapExchange); processor.process(wireTapExchange); } catch (Throwable e) { - LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + destination + ". This exception will be ignored.", e); - } finally { - // emit event that the exchange was sent to the endpoint - long timeTaken = watch.stop(); - EventHelper.notifyExchangeSent(wireTapExchange.getContext(), wireTapExchange, destination, timeTaken); + LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + expression + ". This exception will be ignored.", e); } return wireTapExchange; } @@ -154,9 +151,6 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, answer = configureNewExchange(exchange); } - // set property which endpoint we send to - answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); - // prepare the exchange if (newExchangeExpression != null) { Object body = newExchangeExpression.evaluate(answer, Object.class); @@ -263,7 +257,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownService(processor); if (shutdownExecutorService) { - destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java index 24e365f..e593349 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java @@ -101,7 +101,7 @@ public class ManagedRouteRemoveWireTapExplicitThreadPoolTest extends ManagementT // create a new thread pool to use for wire tap myThreadPool = Executors.newFixedThreadPool(1); - from("seda:foo").routeId("foo").wireTap("direct:tap").executorService(myThreadPool).to("mock:result"); + from("seda:foo").routeId("foo").wireTap("direct:tap", myThreadPool).to("mock:result"); from("direct:tap").routeId("tap").to("mock:tap"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java index 0dd7cad..578a8ed 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java @@ -62,9 +62,9 @@ public class CBRWithWireTapTest extends ContextTestSupport { from("direct:start") .choice() .when(body().contains("Camel")) - .wireTap("mock:camel").end() + .wireTap("mock:camel") .when(body().contains("Donkey")) - .wireTap("mock:donkey").copy().end() + .wireTap("mock:donkey", true) .otherwise() .to("mock:other"); } http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java index a17836e..7f607e1 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java @@ -67,7 +67,7 @@ public class WireTapCustomPool2Test extends ContextTestSupport { from("direct:start") .to("log:foo") // pass in the custom pool to the wireTap DSL - .wireTap("direct:tap").executorService(pool) + .wireTap().constant("direct:tap").executorService(pool) .to("mock:result"); // END SNIPPET: e1 http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java index 2ecff83..43232ee 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java @@ -52,7 +52,7 @@ public class WireTapNewExchangeTest extends ContextTestSupport { from("direct:start") // tap a new message and send it to direct:tap // the new message should be Bye World with 2 headers - .wireTap("direct:tap") + .wireTap().constant("direct:tap") // create the new tap message body and headers .newExchangeBody(constant("Bye World")) .newExchangeHeader("id", constant(123)) http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java index ce4ae2d..2d3915f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java @@ -37,7 +37,7 @@ public class WireTapOnPrepareRefTest extends WireTapOnPrepareTest { @Override public void configure() throws Exception { from("direct:start") - .wireTap("direct:a").onPrepareRef("deepClone") + .wireTap().constant("direct:a").onPrepareRef("deepClone") .to("direct:b"); from("direct:a").process(new ProcessorA()).to("mock:a"); http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java index 64f04d7..68866c5 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java @@ -51,7 +51,7 @@ public class WireTapOnPrepareTest extends ContextTestSupport { public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") - .wireTap("direct:a").onPrepare(new AnimalDeepClonePrepare()) + .wireTap("direct:a", new AnimalDeepClonePrepare()) .to("direct:b"); // END SNIPPET: e1 http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java index 3ea4043..2ac11fed 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java @@ -80,7 +80,7 @@ public class WireTapUsingFireAndForgetCopyAsDefaultTest extends ContextTestSuppo @Override public void configure() throws Exception { from("direct:start") - .wireTap("direct:foo").newExchange(new Processor() { + .wireTap("direct:foo", new Processor() { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); exchange.getIn().setBody("Bye " + body); @@ -156,7 +156,7 @@ public class WireTapUsingFireAndForgetCopyAsDefaultTest extends ContextTestSuppo @Override public void configure() throws Exception { from("direct:start") - .wireTap("direct:foo").newExchangeBody(simple("Bye ${body}")) + .wireTap("direct:foo", simple("Bye ${body}")) .to("mock:result"); from("direct:foo").to("mock:foo"); http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java index ffb3aee..d1c6a60 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java @@ -80,7 +80,7 @@ public class WireTapUsingFireAndForgetCopyTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - .wireTap("direct:foo").copy().newExchange(new Processor() { + .wireTap("direct:foo", true, new Processor() { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); exchange.getIn().setBody("Bye " + body); @@ -156,7 +156,7 @@ public class WireTapUsingFireAndForgetCopyTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - .wireTap("direct:foo").copy().newExchangeBody(simple("Bye ${body}")) + .wireTap("direct:foo", true, simple("Bye ${body}")) .to("mock:result"); from("direct:foo").to("mock:foo"); http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml index 386ffe1..f538243 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml @@ -26,7 +26,9 @@ <route> <from uri="file://target/xmldata"/> <convertBodyTo type="java.lang.String"/> - <wireTap uri="mock:wiretap"/> + <wireTap> + <constant>mock:wiretap</constant> + </wireTap> <to uri="mock:result"/> </route> </camelContext> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml index 39677af..c461c57 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml @@ -25,7 +25,9 @@ <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <wireTap uri="direct:tap"/> + <wireTap> + <constant>direct:tap</constant> + </wireTap> <to uri="mock:end"/> </route> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml index 1bf114c..686e270 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml @@ -44,7 +44,9 @@ <to uri="log:otherwise"/> <to uri="mock:otherwise"/> <multicast streaming="true"> - <wireTap uri="mock:trapped"/> + <wireTap> + <constant>mock:trapped</constant> + </wireTap> <split strategyRef="splitAggregate"> <tokenize token=","/> <filter> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml index 3b81716..4e17834 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml @@ -28,7 +28,8 @@ <from uri="direct:start"/> <!-- tap a new message and send it to direct:tap --> <!-- the new message should be Bye World with 2 headers --> - <wireTap uri="direct:tap"> + <wireTap> + <constant>direct:tap</constant> <!-- create the new tap message body and headers --> <body><constant>Bye World</constant></body> <setHeader headerName="id"><constant>123</constant></setHeader> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml index 9abd708..42b1efb 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml @@ -28,8 +28,10 @@ <route> <from uri="direct:start"/> <to uri="log:foo"/> - <wireTap uri="direct:tap"/> - <to uri="mock:result"/> + <wireTap> + <constant>direct:tap</constant> + </wireTap> + <to uri="mock:result"/> </route> <!-- END SNIPPET: e1 --> @@ -43,7 +45,9 @@ <route> <from uri="direct:test"/> - <wireTap uri="direct:a" id="wiretap_1" /> + <wireTap id="wiretap_1"> + <constant>direct:a</constant> + </wireTap> <to uri="mock:a"/> </route> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml index d2d94c8..3a01f27 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml @@ -27,7 +27,8 @@ <!-- START SNIPPET: e1 --> <route> <from uri="direct:start"/> - <wireTap uri="direct:foo"> + <wireTap> + <constant>direct:foo</constant> <body><simple>Bye ${body}</simple></body> </wireTap> <to uri="mock:result"/> @@ -37,7 +38,9 @@ <!-- START SNIPPET: e2 --> <route> <from uri="direct:start2"/> - <wireTap uri="direct:foo" processorRef="myProcessor"/> + <wireTap processorRef="myProcessor"> + <constant>direct:foo</constant> + </wireTap> <to uri="mock:result"/> </route> <!-- END SNIPPET: e2 --> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml index 99becd0..ad39400 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml @@ -27,7 +27,8 @@ <!-- START SNIPPET: e1 --> <route> <from uri="direct:start"/> - <wireTap uri="direct:foo"> + <wireTap> + <constant>direct:foo</constant> <body><constant>Bye World</constant></body> </wireTap> <to uri="mock:result"/> @@ -37,7 +38,9 @@ <!-- START SNIPPET: e2 --> <route> <from uri="direct:start2"/> - <wireTap uri="direct:foo" processorRef="myProcessor"/> + <wireTap processorRef="myProcessor"> + <constant>direct:foo</constant> + </wireTap> <to uri="mock:result"/> </route> <!-- END SNIPPET: e2 --> http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml index 3ed30d1..c9ae5ad 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml @@ -27,7 +27,9 @@ <route> <from uri="direct:start"/> <!-- use on prepare with wiretap --> - <wireTap uri="direct:a" onPrepareRef="animalDeepClonePrepare"/> + <wireTap onPrepareRef="animalDeepClonePrepare"> + <constant>direct:a</constant> + </wireTap> <to uri="direct:b"/> </route>