Author: cschneider Date: Mon Sep 5 13:54:44 2011 New Revision: 1165283 URL: http://svn.apache.org/viewvc?rev=1165283&view=rev Log: CAMEL-4417 Move AsnycProcessorTypeConverter.convert method to processor while keeping the old one for compatibility
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java Mon Sep 5 13:54:44 2011 @@ -21,7 +21,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class DirectProducer extends Defa callback.done(true); return true; } else { - AsyncProcessor processor = AsyncProcessorTypeConverter.convert(endpoint.getConsumer().getProcessor()); + AsyncProcessor processor = AsyncProcessorConverterHelper.convert(endpoint.getConsumer().getProcessor()); return AsyncProcessorHelper.process(processor, exchange, callback); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Sep 5 13:54:44 2011 @@ -31,7 +31,7 @@ import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.SuspendableService; import org.apache.camel.impl.LoggingExceptionHandler; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; @@ -61,7 +61,7 @@ public class SedaConsumer extends Servic public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; - this.processor = AsyncProcessorTypeConverter.convert(processor); + this.processor = AsyncProcessorConverterHelper.convert(processor); } @Override Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Mon Sep 5 13:54:44 2011 @@ -20,7 +20,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ServiceHelper; @@ -64,7 +64,7 @@ public class DefaultConsumer extends Ser */ public synchronized AsyncProcessor getAsyncProcessor() { if (asyncProcessor == null) { - asyncProcessor = AsyncProcessorTypeConverter.convert(processor); + asyncProcessor = AsyncProcessorConverterHelper.convert(processor); } return asyncProcessor; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Mon Sep 5 13:54:44 2011 @@ -30,7 +30,7 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ProducerCallback; import org.apache.camel.ServicePoolAware; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.spi.ServicePool; import org.apache.camel.support.ServiceSupport; @@ -281,7 +281,7 @@ public class ProducerCache extends Servi try { // invoke the callback - AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(producer); + AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { @Override public void done(boolean doneSync) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java Mon Sep 5 13:54:44 2011 @@ -16,13 +16,12 @@ */ package org.apache.camel.impl.converter; -import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; -import org.apache.camel.processor.DelegateProcessor; +import org.apache.camel.processor.AsyncProcessorConverterHelper; /** * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}. @@ -33,49 +32,11 @@ import org.apache.camel.processor.Delega */ public class AsyncProcessorTypeConverter implements TypeConverter { - private static final class ProcessorToAsyncProcessorBridge extends DelegateProcessor implements AsyncProcessor { - - private ProcessorToAsyncProcessorBridge(Processor processor) { - super(processor); - } - - public boolean process(Exchange exchange, AsyncCallback callback) { - if (processor == null) { - // no processor then we are done - callback.done(true); - return true; - } - try { - processor.process(exchange); - } catch (Throwable e) { - // must catch throwable so we catch all - exchange.setException(e); - } finally { - // we are bridging a sync processor as async so callback with true - callback.done(true); - } - return true; - } - - @Override - public String toString() { - if (processor != null) { - return processor.toString(); - } else { - return "Processor is null"; - } - } - } - public <T> T convertTo(Class<T> type, Object value) { if (value != null) { if (type.equals(AsyncProcessor.class)) { - if (value instanceof AsyncProcessor) { - return type.cast(value); - } else if (value instanceof Processor) { - // Provide an async bridge to the regular processor. - final Processor processor = (Processor)value; - return type.cast(new ProcessorToAsyncProcessorBridge(processor)); + if (value instanceof Processor) { + return type.cast(AsyncProcessorConverterHelper.convert((Processor)value)); } } } @@ -93,11 +54,14 @@ public class AsyncProcessorTypeConverter public <T> T mandatoryConvertTo(Class<T> type, Exchange exchange, Object value) throws NoTypeConversionAvailableException { return convertTo(type, exchange, value); } - + + /** + * @deprecated use AnycProcessorConverter.convert instead + * @param value + * @return + */ + @Deprecated public static AsyncProcessor convert(Processor value) { - if (value instanceof AsyncProcessor) { - return (AsyncProcessor)value; - } - return new ProcessorToAsyncProcessorBridge(value); + return AsyncProcessorConverterHelper.convert(value); } } Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java?rev=1165283&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessorConverterHelper.java Mon Sep 5 13:54:44 2011 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * A simple converter that can convert any {@link Processor} to an {@link AsyncProcessor}. + * Processing will still occur synchronously but it will provide the required + * notifications that the caller expects. + * + * @version + */ +public final class AsyncProcessorConverterHelper { + + private AsyncProcessorConverterHelper() { + // Helper class + } + + private static final class ProcessorToAsyncProcessorBridge extends DelegateProcessor implements AsyncProcessor { + + private ProcessorToAsyncProcessorBridge(Processor processor) { + super(processor); + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + if (processor == null) { + // no processor then we are done + callback.done(true); + return true; + } + try { + processor.process(exchange); + } catch (Throwable e) { + // must catch throwable so we catch all + exchange.setException(e); + } finally { + // we are bridging a sync processor as async so callback with true + callback.done(true); + } + return true; + } + + @Override + public String toString() { + if (processor != null) { + return processor.toString(); + } else { + return "Processor is null"; + } + } + } + + public static AsyncProcessor convert(Processor value) { + if (value instanceof AsyncProcessor) { + return (AsyncProcessor)value; + } + return new ProcessorToAsyncProcessorBridge(value); + } +} Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Mon Sep 5 13:54:44 2011 @@ -25,7 +25,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Predicate; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ServiceHelper; @@ -46,7 +45,7 @@ public class ChoiceProcessor extends Ser public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise) { this.filters = filters; - this.otherwise = AsyncProcessorTypeConverter.convert(otherwise); + this.otherwise = AsyncProcessorConverterHelper.convert(otherwise); } public void process(Exchange exchange) throws Exception { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Mon Sep 5 13:54:44 2011 @@ -29,7 +29,6 @@ import org.apache.camel.Channel; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Service; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.interceptor.StreamCaching; import org.apache.camel.processor.interceptor.TraceFormatter; @@ -301,7 +300,7 @@ public class DefaultChannel extends Serv exchange.getUnitOfWork().pushRouteContext(routeContext); } - AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = async.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // pop the route context we just used Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java Mon Sep 5 13:54:44 2011 @@ -24,7 +24,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ServiceHelper; @@ -53,7 +52,7 @@ public class DelegateAsyncProcessor exte } public DelegateAsyncProcessor(Processor processor) { - this(AsyncProcessorTypeConverter.convert(processor)); + this(AsyncProcessorConverterHelper.convert(processor)); } @Override @@ -70,7 +69,7 @@ public class DelegateAsyncProcessor exte } public void setProcessor(Processor processor) { - this.processor = AsyncProcessorTypeConverter.convert(processor); + this.processor = AsyncProcessorConverterHelper.convert(processor); } protected void doStart() throws Exception { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Mon Sep 5 13:54:44 2011 @@ -22,7 +22,6 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -109,7 +108,7 @@ public class Enricher extends ServiceSup public boolean process(final Exchange exchange, final AsyncCallback callback) { final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); - AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer); + AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer); boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of the routing slip Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/InterceptorToAsyncProcessorBridge.java Mon Sep 5 13:54:44 2011 @@ -20,7 +20,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ServiceHelper; @@ -53,7 +52,7 @@ public class InterceptorToAsyncProcessor * @param target the target */ public InterceptorToAsyncProcessorBridge(Processor interceptor, AsyncProcessor target) { - this.interceptor = AsyncProcessorTypeConverter.convert(interceptor); + this.interceptor = AsyncProcessorConverterHelper.convert(interceptor); this.target = target; } @@ -89,7 +88,7 @@ public class InterceptorToAsyncProcessor } public void setTarget(Processor target) { - this.target = AsyncProcessorTypeConverter.convert(target); + this.target = AsyncProcessorConverterHelper.convert(target); } @Override Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Sep 5 13:54:44 2011 @@ -45,7 +45,6 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.builder.ErrorHandlerBuilder; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; import org.apache.camel.spi.RouteContext; @@ -569,7 +568,7 @@ public class MulticastProcessor extends } // let the prepared process it, remember to begin the exchange pair - AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() { public void done(boolean doneSync) { @@ -701,7 +700,7 @@ public class MulticastProcessor extends // let the prepared process it, remember to begin the exchange pair // we invoke it synchronously as parallel async routing is too hard - AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); pair.begin(); AsyncProcessorHelper.process(async, exchange); } finally { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Mon Sep 5 13:54:44 2011 @@ -25,7 +25,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; @@ -75,7 +74,7 @@ public class Pipeline extends MulticastP // get the next processor Processor processor = processors.next(); - AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, nextExchange, callback, processors, async); // continue as long its being processed synchronously @@ -123,7 +122,7 @@ public class Pipeline extends MulticastP // continue processing the pipeline asynchronously Exchange nextExchange = exchange; while (continueRouting(processors, nextExchange)) { - AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next()); + AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); // check for error if so we should break out if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Sep 5 13:54:44 2011 @@ -29,7 +29,6 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.Message; import org.apache.camel.Predicate; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.model.OnExceptionDefinition; import org.apache.camel.spi.SubUnitOfWorkCallback; import org.apache.camel.util.AsyncProcessorHelper; @@ -185,7 +184,7 @@ public abstract class RedeliveryErrorHan this.redeliveryProcessor = redeliveryProcessor; this.deadLetter = deadLetter; this.output = output; - this.outputAsync = AsyncProcessorTypeConverter.convert(output); + this.outputAsync = AsyncProcessorConverterHelper.convert(output); this.redeliveryPolicy = redeliveryPolicy; this.logger = logger; this.deadLetterUri = deadLetterUri; @@ -695,7 +694,7 @@ public abstract class RedeliveryErrorHan exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); // the failure processor could also be asynchronous - AsyncProcessor afp = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() { public void done(boolean sync) { log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Mon Sep 5 13:54:44 2011 @@ -26,7 +26,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; @@ -48,7 +47,7 @@ public class TryProcessor extends Servic private List<AsyncProcessor> processors; public TryProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses, Processor finallyProcessor) { - this.tryProcessor = AsyncProcessorTypeConverter.convert(tryProcessor); + this.tryProcessor = AsyncProcessorConverterHelper.convert(tryProcessor); this.catchProcessor = new DoCatchProcessor(catchClauses); this.finallyProcessor = new DoFinallyProcessor(finallyProcessor); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Mon Sep 5 13:54:44 2011 @@ -25,7 +25,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Navigate; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -53,7 +53,7 @@ public class IdempotentConsumer extends this.idempotentRepository = idempotentRepository; this.eager = eager; this.skipDuplicate = skipDuplicate; - this.processor = AsyncProcessorTypeConverter.convert(processor); + this.processor = AsyncProcessorConverterHelper.convert(processor); } @Override Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Mon Sep 5 13:54:44 2011 @@ -23,7 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.processor.Traceable; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ObjectHelper; @@ -208,7 +208,7 @@ public class FailOverLoadBalancer extend } log.debug("Processing failover at attempt {} for {}", attempts, exchange); - AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); return AsyncProcessorHelper.process(albp, exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors)); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Mon Sep 5 13:54:44 2011 @@ -22,7 +22,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; /** @@ -40,7 +40,7 @@ public abstract class QueueLoadBalancer if (processor == null) { throw new IllegalStateException("No processors could be chosen to process " + exchange); } else { - AsyncProcessor albp = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); boolean sync = AsyncProcessorHelper.process(albp, exchange, new AsyncCallback() { public void done(boolean doneSync) { // only handle the async case Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java Mon Sep 5 13:54:44 2011 @@ -23,8 +23,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.JndiRegistry; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.AsyncProcessorHelper; @@ -121,7 +121,7 @@ public class AsyncEndpointPolicyTest ext invoked++; // let the original processor continue routing exchange.getIn().setHeader(name, "was wrapped"); - AsyncProcessor ap = AsyncProcessorTypeConverter.convert(processor); + AsyncProcessor ap = AsyncProcessorConverterHelper.convert(processor); boolean sync = ap.process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of this policy Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original) +++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Mon Sep 5 13:54:44 2011 @@ -28,7 +28,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.DefaultExchangeHolder; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +45,7 @@ public class HazelcastSedaConsumer exten public HazelcastSedaConsumer(final Endpoint endpoint, final Processor processor) { super(endpoint, processor); this.endpoint = (HazelcastSedaEndpoint) endpoint; - this.processor = AsyncProcessorTypeConverter.convert(processor); + this.processor = AsyncProcessorConverterHelper.convert(processor); } @Override Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectConsumer.java Mon Sep 5 13:54:44 2011 @@ -23,7 +23,7 @@ import org.apache.camel.ShutdownRunningT import org.apache.camel.SuspendableService; import org.apache.camel.component.routebox.RouteboxConsumer; import org.apache.camel.component.routebox.RouteboxServiceSupport; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.spi.ShutdownAware; public class RouteboxDirectConsumer extends RouteboxServiceSupport implements RouteboxConsumer, ShutdownAware, SuspendableService { @@ -78,7 +78,7 @@ public class RouteboxDirectConsumer exte */ public synchronized AsyncProcessor getAsyncProcessor() { if (asyncProcessor == null) { - asyncProcessor = AsyncProcessorTypeConverter.convert(processor); + asyncProcessor = AsyncProcessorConverterHelper.convert(processor); } return asyncProcessor; } Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/direct/RouteboxDirectProducer.java Mon Sep 5 13:54:44 2011 @@ -26,7 +26,7 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.routebox.RouteboxServiceSupport; import org.apache.camel.component.routebox.strategy.RouteboxDispatcher; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +70,7 @@ public class RouteboxDirectProducer exte RouteboxDispatcher dispatcher = new RouteboxDispatcher(producer); exchange = dispatcher.dispatchAsync(getRouteboxEndpoint(), exchange); if (getRouteboxEndpoint().getConfig().isSendToConsumer()) { - AsyncProcessor processor = AsyncProcessorTypeConverter.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor()); + AsyncProcessor processor = AsyncProcessorConverterHelper.convert(((RouteboxDirectEndpoint)getRouteboxEndpoint()).getConsumer().getProcessor()); flag = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { public void done(boolean doneSync) { // we only have to handle async completion of this policy Modified: camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java?rev=1165283&r1=1165282&r2=1165283&view=diff ============================================================================== --- camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java (original) +++ camel/trunk/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaConsumer.java Mon Sep 5 13:54:44 2011 @@ -29,7 +29,7 @@ import org.apache.camel.ShutdownRunningT import org.apache.camel.component.routebox.RouteboxConsumer; import org.apache.camel.component.routebox.RouteboxServiceSupport; import org.apache.camel.component.routebox.strategy.RouteboxDispatcher; -import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; +import org.apache.camel.processor.AsyncProcessorConverterHelper; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.util.AsyncProcessorHelper; import org.slf4j.Logger; @@ -42,7 +42,7 @@ public class RouteboxSedaConsumer extend public RouteboxSedaConsumer(RouteboxSedaEndpoint endpoint, Processor processor) { super(endpoint); - this.setProcessor(AsyncProcessorTypeConverter.convert(processor)); + this.setProcessor(AsyncProcessorConverterHelper.convert(processor)); this.producer = endpoint.getConfig().getInnerProducerTemplate(); }