Author: davsclaus Date: Mon Nov 16 14:25:19 2009 New Revision: 880767 URL: http://svn.apache.org/viewvc?rev=880767&view=rev Log: CAMEL-2178: Added support for simulating async request/reply in case producer does not nativly support AsyncProcessor. The original thread will still see it as async request/reply and not block.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=880767&r1=880766&r2=880767&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Mon Nov 16 14:25:19 2009 @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -46,6 +47,7 @@ protected final Processor target; protected final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>(); protected ExecutorService executorService; + protected ExecutorService producerExecutorService; protected int poolSize = DEFAULT_THREADPOOL_SIZE; protected ExceptionHandler exceptionHandler; @@ -84,8 +86,6 @@ public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { exchange = configureExchange(exchange, pattern); - AsyncProcessor asyncProducer = exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class, producer); - // pass in the callback that adds the exchange to the completed list of tasks final AsyncCallback callback = new AsyncCallback() { public void onTaskCompleted(Exchange exchange) { @@ -93,8 +93,13 @@ } }; - // produce it async - asyncProducer.process(exchange, callback); + if (producer instanceof AsyncProcessor) { + // producer is async capable so let it process it directly + doAsyncProcess((AsyncProcessor) producer, exchange, callback); + } else { + // producer is a regular processor so simulate async behaviour + doSimulateAsyncProcess(producer, exchange, callback); + } // and return the exchange return exchange; @@ -104,6 +109,55 @@ return answer; } + /** + * The producer is already capable of async processing so let it process it directly. + * + * @param producer the async producer + * @param exchange the exchange + * @param callback the callback + * + * @throws Exception can be thrown in case of processing errors + */ + protected void doAsyncProcess(AsyncProcessor producer, Exchange exchange, AsyncCallback callback) throws Exception { + producer.process(exchange, callback); + } + + /** + * The producer is <b>not</b> capable of async processing so lets simulate this by transfering the task + * to another {...@link ExecutorService} for async processing. + * + * @param producer the producer + * @param exchange the exchange + * @param callback the callback + * + * @throws Exception can be thrown in case of processing errors + */ + protected void doSimulateAsyncProcess(final Processor producer, final Exchange exchange, final AsyncCallback callback) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("Producer "+ producer + " is not an instanceof AsyncProcessor" + + ". Will fallback to simulate async behavior by transferring task to a producer thread pool for further processing."); + } + + // let the producer thread pool handle the task of sending the request which then will simulate the async + // behavior as the original thread is not blocking while we wait for the reply + getProducerExecutorService().submit(new Callable<Exchange>() { + public Exchange call() throws Exception { + // convert the async producer which just blocks until the task is complete + try { + AsyncProcessor asyncProducer = exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class, producer); + asyncProducer.process(exchange, callback); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception while processing: " + exchange, e); + } + // set the exception on the exchange so Camel error handling can deal with it + exchange.setException(e); + } + return exchange; + } + }); + } + @Override public String toString() { return "sendAsyncTo(" + destination + (pattern != null ? " " + pattern : "") + " -> " + target + ")"; @@ -111,15 +165,40 @@ public ExecutorService getExecutorService() { if (executorService == null) { - executorService = createExecutorService(); + executorService = createExecutorService("SendAsyncProcessor-Consumer"); } return executorService; } + /** + * Sets the {...@link java.util.concurrent.ExecutorService} to use for consuming replies. + * + * @param executorService the custom executor service + */ public void setExecutorService(ExecutorService executorService) { this.executorService = executorService; } + public ExecutorService getProducerExecutorService() { + if (producerExecutorService == null) { + // use a cached pool for the producers which can grow/schrink itself + producerExecutorService = ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer", true); + } + return producerExecutorService; + } + + /** + * Sets the {...@link java.util.concurrent.ExecutorService} to use for simulating async producers + * by transferring the {...@link Exchange} to this {...@link java.util.concurrent.ExecutorService} for + * sending the request and block while waiting for the reply. However the original thread + * will not block and as such it all appears as real async request/reply mechanism. + * + * @param producerExecutorService the custom executor service for producers + */ + public void setProducerExecutorService(ExecutorService producerExecutorService) { + this.producerExecutorService = producerExecutorService; + } + public int getPoolSize() { return poolSize; } @@ -156,7 +235,6 @@ while (isRunAllowed()) { Exchange exchange; try { - // TODO: Wonder if we can use take instead of poll with timeout? exchange = completedTasks.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); @@ -183,8 +261,8 @@ } } - protected ExecutorService createExecutorService() { - return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "SendAsyncProcessor", true); + protected ExecutorService createExecutorService(String name) { + return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true); } protected void doStart() throws Exception { @@ -198,6 +276,10 @@ protected void doStop() throws Exception { super.doStop(); + if (producerExecutorService != null) { + producerExecutorService.shutdownNow(); + producerExecutorService = null; + } if (executorService != null) { executorService.shutdownNow(); executorService = null;