This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit eee4e0e5f09bbae7574e3113bd1a612e1892fea3 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Sep 2 09:45:25 2019 +0200 CAMEL-13925: camel-seda - SedaConsumer should extend DefaultConsumer --- .../apache/camel/component/seda/SedaConsumer.java | 85 ++++++++-------------- 1 file changed, 30 insertions(+), 55 deletions(-) diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index d5517b8..e6c2bde 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -24,21 +24,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncProcessor; -import org.apache.camel.Consumer; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.Suspendable; -import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; -import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.LoggingExceptionHandler; import org.apache.camel.support.UnitOfWorkHelper; -import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.ObjectHelper; /** @@ -47,46 +42,23 @@ import org.apache.camel.util.ObjectHelper; * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second. */ -public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, Suspendable { +public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownAware, Suspendable { private final AtomicInteger taskCount = new AtomicInteger(); private volatile CountDownLatch latch; private volatile boolean shutdownPending; private volatile boolean forceShutdown; - private SedaEndpoint endpoint; - private AsyncProcessor processor; private ExecutorService executor; - private ExceptionHandler exceptionHandler; private final int pollTimeout; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { - this.endpoint = endpoint; - this.processor = AsyncProcessorConverterHelper.convert(processor); + super(endpoint, processor); this.pollTimeout = endpoint.getPollTimeout(); - this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); } @Override - public String toString() { - return "SedaConsumer[" + endpoint + "]"; - } - - @Override - public Endpoint getEndpoint() { - return endpoint; - } - - public ExceptionHandler getExceptionHandler() { - return exceptionHandler; - } - - public void setExceptionHandler(ExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - @Override - public Processor getProcessor() { - return processor; + public SedaEndpoint getEndpoint() { + return (SedaEndpoint) super.getEndpoint(); } @Override @@ -100,10 +72,10 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, public int getPendingExchangesSize() { // the route is shutting down, so either we should purge the queue, // or return how many exchanges are still on the queue - if (endpoint.isPurgeWhenStopping()) { - endpoint.purgeQueue(); + if (getEndpoint().isPurgeWhenStopping()) { + getEndpoint().purgeQueue(); } - return endpoint.getQueue().size(); + return getEndpoint().getQueue().size(); } @Override @@ -159,7 +131,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, } protected void doRun() { - BlockingQueue<Exchange> queue = endpoint.getQueue(); + BlockingQueue<Exchange> queue = getEndpoint().getQueue(); // loop while we are allowed, or if we are stopping loop until the queue is empty while (queue != null && isRunAllowed()) { @@ -246,9 +218,9 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, */ protected Exchange prepareExchange(Exchange exchange) { // send a new copied exchange with new camel context - Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext()); + Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getEndpoint().getCamelContext()); // set the from endpoint - newExchange.setFromEndpoint(endpoint); + newExchange.setFromEndpoint(getEndpoint()); return newExchange; } @@ -265,13 +237,13 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, */ protected void sendToConsumers(final Exchange exchange) throws Exception { // validate multiple consumers has been enabled - int size = endpoint.getConsumers().size(); - if (size > 1 && !endpoint.isMultipleConsumersSupported()) { - throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); + int size = getEndpoint().getConsumers().size(); + if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) { + throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + getEndpoint()); } // if there are multiple consumers then multicast to them - if (endpoint.isMultipleConsumersSupported()) { + if (getEndpoint().isMultipleConsumersSupported()) { if (log.isTraceEnabled()) { log.trace("Multicasting to {} consumers for Exchange: {}", size, exchange); @@ -281,7 +253,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, final List<Synchronization> completions = exchange.handoverCompletions(); // use a multicast processor to process it - AsyncProcessor mp = endpoint.getConsumerMulticastProcessor(); + AsyncProcessor mp = getEndpoint().getConsumerMulticastProcessor(); ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this); // and use the asynchronous routing engine to support it @@ -291,40 +263,43 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, }); } else { // use the regular processor and use the asynchronous routing engine to support it - processor.process(exchange, EmptyAsyncCallback.get()); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } } @Override protected void doStart() throws Exception { - latch = new CountDownLatch(endpoint.getConcurrentConsumers()); + super.doStart(); + latch = new CountDownLatch(getEndpoint().getConcurrentConsumers()); shutdownPending = false; forceShutdown = false; setupTasks(); - endpoint.onStarted(this); + getEndpoint().onStarted(this); } @Override protected void doSuspend() throws Exception { - endpoint.onStopped(this); + getEndpoint().onStopped(this); } @Override protected void doResume() throws Exception { - endpoint.onStarted(this); + getEndpoint().onStarted(this); } @Override protected void doStop() throws Exception { // ensure queue is purged if we stop the consumer - if (endpoint.isPurgeWhenStopping()) { - endpoint.purgeQueue(); + if (getEndpoint().isPurgeWhenStopping()) { + getEndpoint().purgeQueue(); } - endpoint.onStopped(this); + getEndpoint().onStopped(this); shutdownExecutor(); + + super.doStop(); } @Override @@ -334,7 +309,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, private void shutdownExecutor() { if (executor != null) { - endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); executor = null; } } @@ -343,11 +318,11 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, * Setup the thread pool and ensures tasks gets executed (if needed) */ private void setupTasks() { - int poolSize = endpoint.getConcurrentConsumers(); + int poolSize = getEndpoint().getConcurrentConsumers(); // create thread pool if needed if (executor == null) { - executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); + executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize); } // submit needed number of tasks
