CAMEL-7570 Fixed the issue that enrich doesn't send out ExchangeSendingEvent 
nor ExchangeSentEvent


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2247cd94
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2247cd94
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2247cd94

Branch: refs/heads/camel-2.12.x
Commit: 2247cd94f56250b21e995415efe99d314625fd3d
Parents: bafde22
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Thu Jul 3 11:31:52 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Thu Jul 3 16:08:10 2014 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/processor/Enricher.java | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2247cd94/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 092e4d2..72faa12 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
@@ -27,11 +28,12 @@ import 
org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
@@ -109,7 +111,11 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor {
      */
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         final Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
-
+        final Endpoint destination = producer.getEndpoint();
+        
+        EventHelper.notifyExchangeSending(exchange.getContext(), 
resourceExchange, destination);
+        // record timing for sending the exchange using the producer
+        final StopWatch watch = new StopWatch();
         AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
         boolean sync = ap.process(resourceExchange, new AsyncCallback() {
             public void done(boolean doneSync) {
@@ -118,6 +124,10 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor {
                     return;
                 }
 
+                // emit event that the exchange was sent to the endpoint
+                long timeTaken = watch.stop();
+                EventHelper.notifyExchangeSent(resourceExchange.getContext(), 
resourceExchange, destination, timeTaken);
+                
                 if (resourceExchange.isFailed()) {
                     // copy resource exchange onto original exchange 
(preserving pattern)
                     copyResultsPreservePattern(exchange, resourceExchange);
@@ -157,6 +167,10 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor {
 
         LOG.trace("Processing exchangeId: {} is continued being processed 
synchronously", exchange.getExchangeId());
 
+        // emit event that the exchange was sent to the endpoint
+        long timeTaken = watch.stop();
+        EventHelper.notifyExchangeSent(resourceExchange.getContext(), 
resourceExchange, destination, timeTaken);
+        
         if (resourceExchange.isFailed()) {
             // copy resource exchange onto original exchange (preserving 
pattern)
             copyResultsPreservePattern(exchange, resourceExchange);

Reply via email to