Author: davsclaus
Date: Mon Nov 16 14:25:19 2009
New Revision: 880767

URL: http://svn.apache.org/viewvc?rev=880767&view=rev
Log:
CAMEL-2178: Added support for simulating async request/reply in case producer 
does not nativly support AsyncProcessor. The original thread will still see it 
as async request/reply and not block.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=880767&r1=880766&r2=880767&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 Mon Nov 16 14:25:19 2009
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@
     protected final Processor target;
     protected final BlockingQueue<Exchange> completedTasks = new 
LinkedBlockingQueue<Exchange>();
     protected ExecutorService executorService;
+    protected ExecutorService producerExecutorService;
     protected int poolSize = DEFAULT_THREADPOOL_SIZE;
     protected ExceptionHandler exceptionHandler;
 
@@ -84,8 +86,6 @@
             public Exchange doInProducer(Producer producer, Exchange exchange, 
ExchangePattern pattern) throws Exception {
                 exchange = configureExchange(exchange, pattern);
 
-                AsyncProcessor asyncProducer = 
exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class, 
producer);
-
                 // pass in the callback that adds the exchange to the 
completed list of tasks
                 final AsyncCallback callback = new AsyncCallback() {
                     public void onTaskCompleted(Exchange exchange) {
@@ -93,8 +93,13 @@
                     }
                 };
 
-                // produce it async
-                asyncProducer.process(exchange, callback);
+                if (producer instanceof AsyncProcessor) {
+                    // producer is async capable so let it process it directly
+                    doAsyncProcess((AsyncProcessor) producer, exchange, 
callback);
+                } else {
+                    // producer is a regular processor so simulate async 
behaviour
+                    doSimulateAsyncProcess(producer, exchange, callback);
+                }
 
                 // and return the exchange
                 return exchange;
@@ -104,6 +109,55 @@
         return answer;
     }
 
+    /**
+     * The producer is already capable of async processing so let it process 
it directly.
+     *
+     * @param producer the async producer
+     * @param exchange the exchange
+     * @param callback the callback
+     *
+     * @throws Exception can be thrown in case of processing errors
+     */
+    protected void doAsyncProcess(AsyncProcessor producer, Exchange exchange, 
AsyncCallback callback) throws Exception {
+        producer.process(exchange, callback);
+    }
+
+    /**
+     * The producer is <b>not</b> capable of async processing so lets simulate 
this by transfering the task
+     * to another {...@link ExecutorService} for async processing.
+     *
+     * @param producer the producer
+     * @param exchange the exchange
+     * @param callback the callback
+     *
+     * @throws Exception can be thrown in case of processing errors
+     */
+    protected void doSimulateAsyncProcess(final Processor producer, final 
Exchange exchange, final AsyncCallback callback) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Producer "+ producer + " is not an instanceof 
AsyncProcessor"
+                + ". Will fallback to simulate async behavior by transferring 
task to a producer thread pool for further processing.");
+        }
+
+        // let the producer thread pool handle the task of sending the request 
which then will simulate the async
+        // behavior as the original thread is not blocking while we wait for 
the reply
+        getProducerExecutorService().submit(new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                // convert the async producer which just blocks until the task 
is complete
+                try {
+                    AsyncProcessor asyncProducer = 
exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class, 
producer);
+                    asyncProducer.process(exchange, callback);
+                } catch (Exception e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Caught exception while processing: " + 
exchange, e);
+                    }
+                    // set the exception on the exchange so Camel error 
handling can deal with it
+                    exchange.setException(e);
+                }
+                return exchange;
+            }
+        });
+    }
+
     @Override
     public String toString() {
         return "sendAsyncTo(" + destination + (pattern != null ? " " + pattern 
: "") + " -> " + target + ")";
@@ -111,15 +165,40 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = createExecutorService();
+            executorService = 
createExecutorService("SendAsyncProcessor-Consumer");
         }
         return executorService;
     }
 
+    /**
+     * Sets the {...@link java.util.concurrent.ExecutorService} to use for 
consuming replies.
+     *
+     * @param executorService the custom executor service
+     */
     public void setExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
     }
 
+    public ExecutorService getProducerExecutorService() {
+        if (producerExecutorService == null) {
+            // use a cached pool for the producers which can grow/schrink 
itself
+            producerExecutorService = 
ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer", true);
+        }
+        return producerExecutorService;
+    }
+
+    /**
+     * Sets the {...@link java.util.concurrent.ExecutorService} to use for 
simulating async producers
+     * by transferring the {...@link Exchange} to this {...@link 
java.util.concurrent.ExecutorService} for
+     * sending the request and block while waiting for the reply. However the 
original thread
+     * will not block and as such it all appears as real async request/reply 
mechanism.
+     *
+     * @param producerExecutorService the custom executor service for producers
+     */
+    public void setProducerExecutorService(ExecutorService 
producerExecutorService) {
+        this.producerExecutorService = producerExecutorService;
+    }
+
     public int getPoolSize() {
         return poolSize;
     }
@@ -156,7 +235,6 @@
         while (isRunAllowed()) {
             Exchange exchange;
             try {
-                // TODO: Wonder if we can use take instead of poll with 
timeout?
                 exchange = completedTasks.poll(1000, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
@@ -183,8 +261,8 @@
         }
     }
 
-    protected ExecutorService createExecutorService() {
-        return 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
"SendAsyncProcessor", true);
+    protected ExecutorService createExecutorService(String name) {
+        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, 
true);
     }
 
     protected void doStart() throws Exception {
@@ -198,6 +276,10 @@
     protected void doStop() throws Exception {
         super.doStop();
 
+        if (producerExecutorService != null) {
+            producerExecutorService.shutdownNow();
+            producerExecutorService = null;
+        }
         if (executorService != null) {
             executorService.shutdownNow();
             executorService = null;


Reply via email to