Repository: camel
Updated Branches:
  refs/heads/master d14fa8c45 -> 01dead92e


CAMEL-8965: WireTap supports dynamic uris like enrich/pollEnrich now does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2696b57b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2696b57b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2696b57b

Branch: refs/heads/master
Commit: 2696b57b831712548d10be1651e053685d6e25da
Parents: d14fa8c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Jul 20 16:18:35 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Jul 20 22:54:07 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/model/ProcessorDefinition.java |  88 ++++++++-----
 .../apache/camel/model/WireTapDefinition.java   | 125 +++----------------
 .../camel/processor/WireTapProcessor.java       |  42 +++----
 ...outeRemoveWireTapExplicitThreadPoolTest.java |   2 +-
 .../camel/processor/CBRWithWireTapTest.java     |   4 +-
 .../camel/processor/WireTapCustomPool2Test.java |   2 +-
 .../camel/processor/WireTapNewExchangeTest.java |   2 +-
 .../processor/WireTapOnPrepareRefTest.java      |   2 +-
 .../camel/processor/WireTapOnPrepareTest.java   |   2 +-
 ...eTapUsingFireAndForgetCopyAsDefaultTest.java |   4 +-
 .../WireTapUsingFireAndForgetCopyTest.java      |   4 +-
 .../FileWireTapWithXMLPayloadIssueTest.xml      |   4 +-
 .../SpringAggregateFromWireTapTest.xml          |   4 +-
 .../processor/SpringComplexBlockWithEndTest.xml |   4 +-
 .../processor/SpringWireTapNewExchangeTest.xml  |   3 +-
 .../spring/processor/SpringWireTapTest.xml      |  10 +-
 .../SpringWireTapUsingFireAndForgetCopyTest.xml |   7 +-
 .../SpringWireTapUsingFireAndForgetTest.xml     |   7 +-
 .../spring/processor/WireTapOnPrepareTest.xml   |   4 +-
 19 files changed, 133 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 17ccb0e..91d0e6f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -1471,6 +1471,7 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         addOutput(answer);
         return ExpressionClause.createAndSetExpression(answer);
     }
+
     /**
      * <a href="http://camel.apache.org/load-balancer.html";>Load Balancer 
EIP:</a>
      * Creates a loadbalance
@@ -2255,14 +2256,28 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * destination gets a copy of the original message to avoid the processors
      * interfering with each other using {@link ExchangePattern#InOnly}.
      *
+     * @return the builder
+     */
+    public ExpressionClause<WireTapDefinition> wireTap() {
+        WireTapDefinition answer = new WireTapDefinition();
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
+     * <a href="http://camel.apache.org/wiretap.html";>WireTap EIP:</a>
+     * Sends messages to all its child outputs; so that each processor and
+     * destination gets a copy of the original message to avoid the processors
+     * interfering with each other using {@link ExchangePattern#InOnly}.
+     *
      * @param uri  the destination
      * @return the builder
      */
-    public WireTapDefinition<Type> wireTap(String uri) {
-        WireTapDefinition<Type> answer = new WireTapDefinition<Type>();
-        answer.setUri(uri);
+    public Type wireTap(String uri) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
         addOutput(answer);
-        return answer;
+        return (Type) this;
     }
 
     /**
@@ -2275,15 +2290,13 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param      executorService a custom {@link ExecutorService} to use as 
thread pool
      *             for sending tapped exchanges
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, ExecutorService 
executorService) {
-        WireTapDefinition<Type> answer = new WireTapDefinition<Type>();
-        answer.setUri(uri);
+    public Type wireTap(String uri, ExecutorService executorService) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
         answer.setExecutorService(executorService);
         addOutput(answer);
-        return answer;
+        return (Type) this;
     }
 
     /**
@@ -2296,15 +2309,13 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param      executorServiceRef reference to lookup a custom {@link 
ExecutorService}
      *             to use as thread pool for sending tapped exchanges
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, String 
executorServiceRef) {
-        WireTapDefinition<Type> answer = new WireTapDefinition<Type>();
-        answer.setUri(uri);
+    public Type wireTap(String uri, String executorServiceRef) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
         answer.setExecutorServiceRef(executorServiceRef);
         addOutput(answer);
-        return answer;
+        return (Type) this;
     }
 
     /**
@@ -2318,10 +2329,8 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param uri  the destination
      * @param body expression that creates the body to send
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, Expression body) {
+    public Type wireTap(String uri, Expression body) {
         return wireTap(uri, true, body);
     }
 
@@ -2332,18 +2341,33 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      *
      * @param uri  the destination
      * @param copy whether or not use a copy of the original exchange or a new 
empty exchange
+     * @return the builder
+     */
+    public Type wireTap(String uri, boolean copy) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
+        answer.setCopy(copy);
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/wiretap.html";>WireTap EIP:</a>
+     * Sends a new {@link org.apache.camel.Exchange} to the destination
+     * using {@link ExchangePattern#InOnly}.
+     *
+     * @param uri  the destination
+     * @param copy whether or not use a copy of the original exchange or a new 
empty exchange
      * @param body expression that creates the body to send
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, boolean copy, 
Expression body) {
-        WireTapDefinition<Type> answer = new WireTapDefinition<Type>();
-        answer.setUri(uri);
+    public Type wireTap(String uri, boolean copy, Expression body) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
         answer.setCopy(copy);
         answer.setNewExchangeExpression(body);
         addOutput(answer);
-        return answer;
+        return (Type) this;
     }
 
     /**
@@ -2357,10 +2381,8 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param uri  the destination
      * @param processor  processor preparing the new exchange to send
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, Processor processor) {
+    public Type wireTap(String uri, Processor processor) {
         return wireTap(uri, true, processor);
     }
 
@@ -2373,16 +2395,14 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param copy whether or not use a copy of the original exchange or a new 
empty exchange
      * @param processor  processor preparing the new exchange to send
      * @return the builder
-     * @deprecated use the fluent builder from {@link WireTapDefinition}, will 
be removed in Camel 3.0
      */
-    @Deprecated
-    public WireTapDefinition<Type> wireTap(String uri, boolean copy, Processor 
processor) {
-        WireTapDefinition<Type> answer = new WireTapDefinition<Type>();
-        answer.setUri(uri);
+    public Type wireTap(String uri, boolean copy, Processor processor) {
+        WireTapDefinition answer = new WireTapDefinition();
+        answer.setExpression(new SimpleExpression(uri));
         answer.setCopy(copy);
         answer.setNewExchangeProcessor(processor);
         addOutput(answer);
-        return answer;
+        return (Type) this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 49631bb..ebe95b1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -27,12 +27,11 @@ import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
@@ -44,15 +43,7 @@ import org.apache.camel.util.CamelContextHelper;
 @Metadata(label = "eip,endpoint,routing")
 @XmlRootElement(name = "wireTap")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends 
NoOutputDefinition<WireTapDefinition<Type>>
-        implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>>, 
EndpointRequiredDefinition {
-    @XmlAttribute
-    protected String uri;
-    @XmlAttribute
-    @Deprecated
-    protected String ref;
-    @XmlTransient
-    protected Endpoint endpoint;
+public class WireTapDefinition extends NoOutputExpressionNode implements 
ExecutorServiceAwareDefinition<WireTapDefinition> {
     @XmlTransient
     private Processor newExchangeProcessor;
     @XmlAttribute(name = "processorRef")
@@ -75,37 +66,17 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
     public WireTapDefinition() {
     }
 
-    public WireTapDefinition(String uri) {
-        setUri(uri);
-    }
-
-    public WireTapDefinition(Endpoint endpoint) {
-        setEndpoint(endpoint);
-    }
-
-    @Override
-    public String getEndpointUri() {
-        if (uri != null) {
-            return uri;
-        } else if (endpoint != null) {
-            return endpoint.getEndpointUri();
-        } else {
-            return null;
-        }
-    }
-
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
         // executor service is mandatory for wire tap
         boolean shutdownThreadPool = 
ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
         ExecutorService threadPool = 
ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", 
this, true);
 
-        // create the producer to send to the wire tapped endpoint
-        Endpoint endpoint = resolveEndpoint(routeContext);
-        Producer producer = endpoint.createProducer();
+        // create the send dynamic producer to send to the wire tapped endpoint
+        Processor dynamicTo = new SendDynamicProcessor(null, getExpression());
 
         // create error handler we need to use for processing the wire tapped
-        Processor target = wrapInErrorHandler(routeContext, producer);
+        Processor target = wrapInErrorHandler(routeContext, dynamicTo);
 
         // and wrap in unit of work
         CamelInternalProcessor internal = new CamelInternalProcessor(target);
@@ -114,7 +85,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
         // is true bt default
         boolean isCopy = getCopy() == null || getCopy();
 
-        WireTapProcessor answer = new WireTapProcessor(endpoint, internal, 
getPattern(), threadPool, shutdownThreadPool);
+        WireTapProcessor answer = new WireTapProcessor(getExpression(), 
internal, getPattern(), threadPool, shutdownThreadPool);
         answer.setCopy(isCopy);
         if (newExchangeProcessorRef != null) {
             newExchangeProcessor = 
routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class);
@@ -147,37 +118,12 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
 
     @Override
     public String toString() {
-        return "WireTap[" + description() + "]";
+        return "WireTap[" + getExpression() + "]";
     }
     
-    protected String description() {
-        return FromDefinition.description(getUri(), getRef(), getEndpoint());
-    }
-
     @Override
     public String getLabel() {
-        return "wireTap[" + description() + "]";
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Type end() {
-        // allow end() to return to previous type so you can continue in the 
DSL
-        return (Type) super.end();
-    }
-
-    @Override
-    public void addOutput(ProcessorDefinition<?> output) {
-        // add outputs on parent as this wiretap does not support outputs
-        getParent().addOutput(output);
-    }
-
-    public Endpoint resolveEndpoint(RouteContext context) {
-        if (endpoint == null) {
-            return context.resolveEndpoint(getUri(), getRef());
-        } else {
-            return endpoint;
-        }
+        return "wireTap[" + getExpression() + "]";
     }
 
     // Fluent API
@@ -190,7 +136,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      *                        for sending tapped exchanges
      * @return the builder
      */
-    public WireTapDefinition<Type> executorService(ExecutorService 
executorService) {
+    public WireTapDefinition executorService(ExecutorService executorService) {
         setExecutorService(executorService);
         return this;
     }
@@ -202,7 +148,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      *                           to use as thread pool for sending tapped 
exchanges
      * @return the builder
      */
-    public WireTapDefinition<Type> executorServiceRef(String 
executorServiceRef) {
+    public WireTapDefinition executorServiceRef(String executorServiceRef) {
         setExecutorServiceRef(executorServiceRef);
         return this;
     }
@@ -212,7 +158,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      *
      * @return the builder
      */
-    public WireTapDefinition<Type> copy() {
+    public WireTapDefinition copy() {
         setCopy(true);
         return this;
     }
@@ -224,7 +170,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      *             if it is false camel will not copy the original exchange 
      * @return the builder
      */
-    public WireTapDefinition<Type> copy(boolean copy) {
+    public WireTapDefinition copy(boolean copy) {
         setCopy(copy);
         return this;
     }
@@ -233,7 +179,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @deprecated will be removed in Camel 3.0 Instead use {@link 
#newExchangeBody(org.apache.camel.Expression)}
      */
     @Deprecated
-    public WireTapDefinition<Type> newExchange(Expression expression) {
+    public WireTapDefinition newExchange(Expression expression) {
         return newExchangeBody(expression);
     }
 
@@ -244,7 +190,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @return the builder
      * @see #newExchangeHeader(String, org.apache.camel.Expression)
      */
-    public WireTapDefinition<Type> newExchangeBody(Expression expression) {
+    public WireTapDefinition newExchangeBody(Expression expression) {
         setNewExchangeExpression(expression);
         return this;
     }
@@ -256,7 +202,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      *            be used for preparing the new exchange to send
      * @return the builder
      */
-    public WireTapDefinition<Type> newExchangeRef(String ref) {
+    public WireTapDefinition newExchangeRef(String ref) {
         setNewExchangeProcessorRef(ref);
         return this;
     }
@@ -268,7 +214,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @return the builder
      * @see #newExchangeHeader(String, org.apache.camel.Expression)
      */
-    public WireTapDefinition<Type> newExchange(Processor processor) {
+    public WireTapDefinition newExchange(Processor processor) {
         setNewExchangeProcessor(processor);
         return this;
     }
@@ -283,7 +229,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @param expression  the expression setting the header value
      * @return the builder
      */
-    public WireTapDefinition<Type> newExchangeHeader(String headerName, 
Expression expression) {
+    public WireTapDefinition newExchangeHeader(String headerName, Expression 
expression) {
         headers.add(new SetHeaderDefinition(headerName, expression));
         return this;
     }
@@ -296,7 +242,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @param onPrepare the processor
      * @return the builder
      */
-    public WireTapDefinition<Type> onPrepare(Processor onPrepare) {
+    public WireTapDefinition onPrepare(Processor onPrepare) {
         setOnPrepare(onPrepare);
         return this;
     }
@@ -309,44 +255,11 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
      * @param onPrepareRef reference to the processor to lookup in the {@link 
org.apache.camel.spi.Registry}
      * @return the builder
      */
-    public WireTapDefinition<Type> onPrepareRef(String onPrepareRef) {
+    public WireTapDefinition onPrepareRef(String onPrepareRef) {
         setOnPrepareRef(onPrepareRef);
         return this;
     }
 
-    public String getUri() {
-        return uri;
-    }
-
-    /**
-     * Uri of the endpoint to use as wire tap
-     */
-    public void setUri(String uri) {
-        this.uri = uri;
-    }
-
-    public String getRef() {
-        return ref;
-    }
-
-    /**
-     * Reference of the endpoint to use as wire tap
-     *
-     * @deprecated use uri with ref:uri instead
-     */
-    @Deprecated
-    public void setRef(String ref) {
-        this.ref = ref;
-    }
-
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
-    public void setEndpoint(Endpoint endpoint) {
-        this.endpoint = endpoint;
-    }
-
     public Processor getNewExchangeProcessor() {
         return newExchangeProcessor;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 1d6b835..87a3365 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -24,8 +24,8 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
@@ -37,11 +37,9 @@ import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +48,11 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class WireTapProcessor extends ServiceSupport implements 
AsyncProcessor, Traceable, EndpointAware, IdAware {
+public class WireTapProcessor extends ServiceSupport implements 
AsyncProcessor, Traceable, IdAware, CamelContextAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(WireTapProcessor.class);
     private String id;
-    private final Endpoint destination;
+    private CamelContext camelContext;
+    private final Expression expression;
     private final Processor processor;
     private final ExchangePattern exchangePattern;
     private final ExecutorService executorService;
@@ -66,9 +65,9 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
     private boolean copy;
     private Processor onPrepare;
 
-    public WireTapProcessor(Endpoint destination, Processor processor, 
ExchangePattern exchangePattern,
+    public WireTapProcessor(Expression expression, Processor processor, 
ExchangePattern exchangePattern,
                             ExecutorService executorService, boolean 
shutdownExecutorService) {
-        this.destination = destination;
+        this.expression = expression;
         this.processor = processor;
         this.exchangePattern = exchangePattern;
         ObjectHelper.notNull(executorService, "executorService");
@@ -78,12 +77,12 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
 
     @Override
     public String toString() {
-        return "WireTap[" + destination + "]";
+        return "WireTap[" + expression + "]";
     }
 
     @Override
     public String getTraceLabel() {
-        return "wireTap(" + destination + ")";
+        return "wireTap(" + expression + ")";
     }
 
     public String getId() {
@@ -94,8 +93,12 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         this.id = id;
     }
 
-    public Endpoint getEndpoint() {
-        return destination;
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -122,17 +125,11 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         // send the exchange to the destination using an executor service
         executorService.submit(new Callable<Exchange>() {
             public Exchange call() throws Exception {
-                final StopWatch watch = new StopWatch();
                 try {
-                    
EventHelper.notifyExchangeSending(wireTapExchange.getContext(), 
wireTapExchange, destination);
-                    LOG.debug(">>>> (wiretap) {} {}", destination, 
wireTapExchange);
+                    LOG.debug(">>>> (wiretap) {} {}", expression, 
wireTapExchange);
                     processor.process(wireTapExchange);
                 } catch (Throwable e) {
-                    LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + destination + ". This exception will be 
ignored.", e);
-                } finally {
-                    // emit event that the exchange was sent to the endpoint
-                    long timeTaken = watch.stop();
-                    
EventHelper.notifyExchangeSent(wireTapExchange.getContext(), wireTapExchange, 
destination, timeTaken);
+                    LOG.warn("Error occurred during processing " + 
wireTapExchange + " wiretap to " + expression + ". This exception will be 
ignored.", e);
                 }
                 return wireTapExchange;
             }
@@ -154,9 +151,6 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
             answer = configureNewExchange(exchange);
         }
 
-        // set property which endpoint we send to
-        answer.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri());
-
         // prepare the exchange
         if (newExchangeExpression != null) {
             Object body = newExchangeExpression.evaluate(answer, Object.class);
@@ -263,7 +257,7 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownService(processor);
         if (shutdownExecutorService) {
-            
destination.getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
index 24e365f..e593349 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteRemoveWireTapExplicitThreadPoolTest.java
@@ -101,7 +101,7 @@ public class 
ManagedRouteRemoveWireTapExplicitThreadPoolTest extends ManagementT
                 // create a new thread pool to use for wire tap
                 myThreadPool = Executors.newFixedThreadPool(1);
 
-                
from("seda:foo").routeId("foo").wireTap("direct:tap").executorService(myThreadPool).to("mock:result");
+                from("seda:foo").routeId("foo").wireTap("direct:tap", 
myThreadPool).to("mock:result");
                 from("direct:tap").routeId("tap").to("mock:tap");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
index 0dd7cad..578a8ed 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
@@ -62,9 +62,9 @@ public class CBRWithWireTapTest extends ContextTestSupport {
                 from("direct:start")
                     .choice()
                         .when(body().contains("Camel"))
-                            .wireTap("mock:camel").end()
+                            .wireTap("mock:camel")
                         .when(body().contains("Donkey"))
-                            .wireTap("mock:donkey").copy().end()
+                            .wireTap("mock:donkey", true)
                         .otherwise()
                             .to("mock:other");
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
index a17836e..7f607e1 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
@@ -67,7 +67,7 @@ public class WireTapCustomPool2Test extends 
ContextTestSupport {
                 from("direct:start")
                     .to("log:foo")
                     // pass in the custom pool to the wireTap DSL
-                    .wireTap("direct:tap").executorService(pool)
+                    .wireTap().constant("direct:tap").executorService(pool)
                     .to("mock:result");
                 // END SNIPPET: e1
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
index 2ecff83..43232ee 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
@@ -52,7 +52,7 @@ public class WireTapNewExchangeTest extends 
ContextTestSupport {
                 from("direct:start")
                     // tap a new message and send it to direct:tap
                     // the new message should be Bye World with 2 headers
-                    .wireTap("direct:tap")
+                    .wireTap().constant("direct:tap")
                         // create the new tap message body and headers
                         .newExchangeBody(constant("Bye World"))
                         .newExchangeHeader("id", constant(123))

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
index ce4ae2d..2d3915f 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
@@ -37,7 +37,7 @@ public class WireTapOnPrepareRefTest extends 
WireTapOnPrepareTest {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .wireTap("direct:a").onPrepareRef("deepClone")
+                    .wireTap().constant("direct:a").onPrepareRef("deepClone")
                     .to("direct:b");
 
                 from("direct:a").process(new ProcessorA()).to("mock:a");

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java
index 64f04d7..68866c5 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareTest.java
@@ -51,7 +51,7 @@ public class WireTapOnPrepareTest extends ContextTestSupport {
             public void configure() throws Exception {
                 // START SNIPPET: e1
                 from("direct:start")
-                    .wireTap("direct:a").onPrepare(new 
AnimalDeepClonePrepare())
+                    .wireTap("direct:a", new AnimalDeepClonePrepare())
                     .to("direct:b");
                 // END SNIPPET: e1
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java
index 3ea4043..2ac11fed 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyAsDefaultTest.java
@@ -80,7 +80,7 @@ public class WireTapUsingFireAndForgetCopyAsDefaultTest 
extends ContextTestSuppo
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .wireTap("direct:foo").newExchange(new Processor() {
+                    .wireTap("direct:foo", new Processor() {
                         public void process(Exchange exchange) throws 
Exception {
                             String body = 
exchange.getIn().getBody(String.class);
                             exchange.getIn().setBody("Bye " + body);
@@ -156,7 +156,7 @@ public class WireTapUsingFireAndForgetCopyAsDefaultTest 
extends ContextTestSuppo
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .wireTap("direct:foo").newExchangeBody(simple("Bye 
${body}"))
+                    .wireTap("direct:foo", simple("Bye ${body}"))
                     .to("mock:result");
 
                 from("direct:foo").to("mock:foo");

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java
index ffb3aee..d1c6a60 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetCopyTest.java
@@ -80,7 +80,7 @@ public class WireTapUsingFireAndForgetCopyTest extends 
ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .wireTap("direct:foo").copy().newExchange(new Processor() {
+                    .wireTap("direct:foo", true, new Processor() {
                         public void process(Exchange exchange) throws 
Exception {
                             String body = 
exchange.getIn().getBody(String.class);
                             exchange.getIn().setBody("Bye " + body);
@@ -156,7 +156,7 @@ public class WireTapUsingFireAndForgetCopyTest extends 
ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .wireTap("direct:foo").copy().newExchangeBody(simple("Bye 
${body}"))
+                    .wireTap("direct:foo", true, simple("Bye ${body}"))
                     .to("mock:result");
 
                 from("direct:foo").to("mock:foo");

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
index 386ffe1..f538243 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
@@ -26,7 +26,9 @@
         <route>
             <from uri="file://target/xmldata"/>
             <convertBodyTo type="java.lang.String"/>
-            <wireTap uri="mock:wiretap"/>
+            <wireTap>
+              <constant>mock:wiretap</constant>
+            </wireTap>
             <to uri="mock:result"/>
         </route>
     </camelContext>

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
index 39677af..c461c57 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
@@ -25,7 +25,9 @@
     <camelContext xmlns="http://camel.apache.org/schema/spring";>
         <route>
             <from uri="direct:start"/>
-            <wireTap uri="direct:tap"/>
+            <wireTap>
+                <constant>direct:tap</constant>
+            </wireTap>
             <to uri="mock:end"/>
         </route>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
index 1bf114c..686e270 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
@@ -44,7 +44,9 @@
                     <to uri="log:otherwise"/>
                     <to uri="mock:otherwise"/>
                     <multicast streaming="true">
-                        <wireTap uri="mock:trapped"/>
+                        <wireTap>
+                          <constant>mock:trapped</constant>
+                        </wireTap>
                         <split strategyRef="splitAggregate">
                             <tokenize token=","/>
                             <filter>

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
index 3b81716..4e17834 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
@@ -28,7 +28,8 @@
             <from uri="direct:start"/>
             <!-- tap a new message and send it to direct:tap -->
             <!-- the new message should be Bye World with 2 headers -->
-            <wireTap uri="direct:tap">
+            <wireTap>
+              <constant>direct:tap</constant>
                 <!-- create the new tap message body and headers -->
                 <body><constant>Bye World</constant></body>
                 <setHeader headerName="id"><constant>123</constant></setHeader>

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
index 9abd708..42b1efb 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
@@ -28,8 +28,10 @@
         <route>
             <from uri="direct:start"/>
             <to uri="log:foo"/>
-            <wireTap uri="direct:tap"/>
-            <to uri="mock:result"/>
+            <wireTap>
+              <constant>direct:tap</constant>
+            </wireTap>
+          <to uri="mock:result"/>
         </route>
         <!-- END SNIPPET: e1 -->
 
@@ -43,7 +45,9 @@
         
         <route>
                <from uri="direct:test"/>
-               <wireTap uri="direct:a" id="wiretap_1" />
+               <wireTap id="wiretap_1">
+            <constant>direct:a</constant>
+          </wireTap>
                <to uri="mock:a"/>
         </route>
         

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
index d2d94c8..3a01f27 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
@@ -27,7 +27,8 @@
         <!-- START SNIPPET: e1 -->
         <route>
             <from uri="direct:start"/>
-            <wireTap uri="direct:foo">
+            <wireTap>
+                <constant>direct:foo</constant>
                 <body><simple>Bye ${body}</simple></body>
             </wireTap>
             <to uri="mock:result"/>
@@ -37,7 +38,9 @@
         <!-- START SNIPPET: e2 -->
         <route>
             <from uri="direct:start2"/>
-            <wireTap uri="direct:foo" processorRef="myProcessor"/>
+            <wireTap processorRef="myProcessor">
+                <constant>direct:foo</constant>
+            </wireTap>
             <to uri="mock:result"/>
         </route>
         <!-- END SNIPPET: e2 -->

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
index 99becd0..ad39400 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
@@ -27,7 +27,8 @@
         <!-- START SNIPPET: e1 -->
         <route>
             <from uri="direct:start"/>
-            <wireTap uri="direct:foo">
+            <wireTap>
+                <constant>direct:foo</constant>
                 <body><constant>Bye World</constant></body>
             </wireTap>
             <to uri="mock:result"/>
@@ -37,7 +38,9 @@
         <!-- START SNIPPET: e2 -->
         <route>
             <from uri="direct:start2"/>
-            <wireTap uri="direct:foo" processorRef="myProcessor"/>
+            <wireTap processorRef="myProcessor">
+                <constant>direct:foo</constant>
+            </wireTap>
             <to uri="mock:result"/>
         </route>
         <!-- END SNIPPET: e2 -->

http://git-wip-us.apache.org/repos/asf/camel/blob/2696b57b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
index 3ed30d1..c9ae5ad 100644
--- 
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
+++ 
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
@@ -27,7 +27,9 @@
         <route>
             <from uri="direct:start"/>
             <!-- use on prepare with wiretap -->
-            <wireTap uri="direct:a" onPrepareRef="animalDeepClonePrepare"/>
+            <wireTap onPrepareRef="animalDeepClonePrepare">
+                <constant>direct:a</constant>
+            </wireTap>
             <to uri="direct:b"/>
         </route>
 

Reply via email to