This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch context-value-scoped-value-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit f6918961fcbced447beea3f41fcb9949daba598d Author: Guillaume Nodet <[email protected]> AuthorDate: Fri Jan 9 11:10:22 2026 +0100 Refactor SedaConsumer to use template method pattern for extensibility Extract template method hooks in SedaConsumer to allow subclasses to customize polling behavior without duplicating the entire doRun() loop: - beforePoll(): Called before polling, returns true to proceed or false to skip this iteration. Allows acquiring resources like permits. - afterPollEmpty(): Called when poll returns no message. Allows releasing resources. - processPolledExchange(Exchange): Processes the polled exchange. Default is inline processing; can be overridden to dispatch to another thread. Also made these methods protected for subclass access: - createExecutor(int poolSize): Creates the executor service - setupTasks(): Sets up thread pool and tasks - shutdownExecutor(): Shuts down executors - isShutdownPending()/setShutdownPending(): Access shutdown state - pollTimeout field: Made protected ThreadPerTaskSedaConsumer now simply overrides these hooks instead of duplicating the entire polling loop, reducing code from 223 to 158 lines and improving maintainability. --- .../apache/camel/component/seda/SedaConsumer.java | 100 ++++++++++--- .../component/seda/ThreadPerTaskSedaConsumer.java | 157 +++++++++++++++++++++ 2 files changed, 241 insertions(+), 16 deletions(-) diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 6efe2ef8af08..845fd9045513 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -53,13 +53,25 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA private volatile boolean shutdownPending; private volatile boolean forceShutdown; private ExecutorService executor; - private final int pollTimeout; + protected final int pollTimeout; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.pollTimeout = endpoint.getPollTimeout(); } + protected int getPollTimeout() { + return pollTimeout; + } + + protected boolean isShutdownPending() { + return shutdownPending; + } + + protected void setShutdownPending(boolean shutdownPending) { + this.shutdownPending = shutdownPending; + } + @Override public SedaEndpoint getEndpoint() { return (SedaEndpoint) super.getEndpoint(); @@ -174,6 +186,11 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA Exchange exchange = null; try { + // hook for subclasses to prepare before polling (e.g., acquire permits) + if (!beforePoll()) { + continue; + } + // use the end user configured poll timeout exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); if (LOG.isTraceEnabled()) { @@ -182,20 +199,20 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA } if (exchange != null) { try { - final Exchange original = exchange; - // prepare the exchange before sending to consumer - final Exchange prepared = prepareExchange(exchange); - // callback to be executed when sending to consumer and processing is done - AsyncCallback callback = doneSync -> onProcessingDone(original, prepared); - // process the exchange - sendToConsumers(prepared, callback); + // process the exchange (subclasses can override to dispatch to another thread) + processPolledExchange(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); } - } else if (shutdownPending && queue.isEmpty()) { - LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); - // we want to shutdown so break out if there queue is empty - break; + } else { + // hook for subclasses to cleanup after empty poll (e.g., release permits) + afterPollEmpty(); + if (shutdownPending && queue.isEmpty()) { + LOG.trace( + "Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); + // we want to shutdown so break out if there queue is empty + break; + } } } catch (InterruptedException e) { LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); @@ -210,6 +227,42 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA } } + /** + * Hook called before polling the queue. Subclasses can override to acquire resources (e.g., permits). + * + * @return true to proceed with polling, false to skip this poll iteration + * @throws InterruptedException if interrupted while acquiring resources + */ + protected boolean beforePoll() throws InterruptedException { + return true; + } + + /** + * Hook called when poll returned no exchange. Subclasses can override to release resources (e.g., permits). + */ + protected void afterPollEmpty() { + // nothing by default + } + + /** + * Process a polled exchange. Subclasses can override to dispatch to another thread. + * + * @param exchange the exchange to process + */ + protected void processPolledExchange(Exchange exchange) { + final Exchange original = exchange; + // prepare the exchange before sending to consumer + final Exchange prepared = prepareExchange(exchange); + // callback to be executed when sending to consumer and processing is done + AsyncCallback callback = doneSync -> onProcessingDone(original, prepared); + // process the exchange + try { + sendToConsumers(prepared, callback); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + } + /** * Strategy to invoke when the exchange is done being processed. * <p/> @@ -332,23 +385,38 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA shutdownExecutor(); } - private void shutdownExecutor() { + /** + * Shuts down the executor service. + */ + protected void shutdownExecutor() { if (executor != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); executor = null; } } + /** + * Creates the executor service used for consumer threads. + * <p/> + * Subclasses can override this method to provide a different executor, such as one using virtual threads. + * + * @param poolSize the number of concurrent consumers + * @return the executor service + */ + protected ExecutorService createExecutor(int poolSize) { + return getEndpoint().getCamelContext().getExecutorServiceManager() + .newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize); + } + /** * Setup the thread pool and ensures tasks gets executed (if needed) */ - private void setupTasks() { + protected void setupTasks() { int poolSize = getEndpoint().getConcurrentConsumers(); // create thread pool if needed if (executor == null) { - executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, - getEndpoint().getEndpointUri(), poolSize); + executor = createExecutor(poolSize); } // submit needed number of tasks diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java new file mode 100644 index 000000000000..ff879ac75dac --- /dev/null +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A SEDA consumer that spawns a new thread/task for each message instead of using a fixed pool of long-running consumer + * threads. + * <p> + * This consumer model is optimized for virtual threads (JDK 21+) where creating threads is very cheap, but it also + * works with platform threads. The key differences from {@link SedaConsumer} are: + * <ul> + * <li>Uses a cached thread pool instead of a fixed pool</li> + * <li>A single coordinator thread polls the queue</li> + * <li>Each message is processed in its own task/thread</li> + * <li>The concurrentConsumers setting becomes a concurrency limit (0 = unlimited)</li> + * </ul> + * <p> + * When virtual threads are enabled via {@code camel.threads.virtual.enabled=true}, the cached thread pool will use + * {@code Executors.newThreadPerTaskExecutor()}, providing optimal scaling for I/O-bound workloads. + */ +public class ThreadPerTaskSedaConsumer extends SedaConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadPerTaskSedaConsumer.class); + + private final int maxConcurrentTasks; + private final LongAdder activeTasks = new LongAdder(); + + private volatile ExecutorService taskExecutor; + private volatile Semaphore concurrencyLimiter; + + public ThreadPerTaskSedaConsumer(SedaEndpoint endpoint, Processor processor) { + super(endpoint, processor); + // Use concurrentConsumers as the max concurrent tasks limit + // 0 means unlimited (the most common case for virtual threads) + this.maxConcurrentTasks = endpoint.getConcurrentConsumers(); + } + + @Override + protected ExecutorService createExecutor(int poolSize) { + // Create a single-thread executor for the coordinator + // The actual work is done by taskExecutor + return getEndpoint().getCamelContext().getExecutorServiceManager() + .newSingleThreadExecutor(this, getEndpoint().getEndpointUri() + "-coordinator"); + } + + @Override + protected void setupTasks() { + // Create task executor - uses virtual threads when enabled + taskExecutor = getEndpoint().getCamelContext().getExecutorServiceManager() + .newCachedThreadPool(this, getEndpoint().getEndpointUri() + "-task"); + + // Create concurrency limiter if max is specified and > 0 + if (maxConcurrentTasks > 0) { + concurrencyLimiter = new Semaphore(maxConcurrentTasks); + LOG.debug("Using concurrency limit of {} for thread-per-task consumer", maxConcurrentTasks); + } + + // Call parent to create the coordinator executor and start it + super.setupTasks(); + + LOG.info("Started thread-per-task SEDA consumer for {} (maxConcurrent={})", + getEndpoint().getEndpointUri(), maxConcurrentTasks > 0 ? maxConcurrentTasks : "unlimited"); + } + + @Override + protected void shutdownExecutor() { + super.shutdownExecutor(); + if (taskExecutor != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(taskExecutor); + taskExecutor = null; + } + } + + @Override + protected boolean beforePoll() throws InterruptedException { + // Acquire permit if using concurrency limiter (blocks if at limit) + if (concurrencyLimiter != null) { + return concurrencyLimiter.tryAcquire(pollTimeout, TimeUnit.MILLISECONDS); + } + return true; + } + + @Override + protected void afterPollEmpty() { + // Release permit if we acquired one + if (concurrencyLimiter != null) { + concurrencyLimiter.release(); + } + } + + @Override + protected void processPolledExchange(Exchange exchange) { + // Dispatch to task executor for processing + taskExecutor.execute(() -> { + activeTasks.increment(); + try { + // Prepare the exchange + Exchange prepared = prepareExchange(exchange); + + // Process asynchronously + AsyncCallback callback = doneSync -> { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } + }; + sendToConsumers(prepared, callback); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } finally { + activeTasks.decrement(); + if (concurrencyLimiter != null) { + concurrencyLimiter.release(); + } + } + }); + } + + /** + * Returns the current number of active processing tasks. + */ + public long getActiveTaskCount() { + return activeTasks.sum(); + } + + /** + * Returns the maximum concurrent tasks allowed (0 means unlimited). + */ + public int getMaxConcurrentTasks() { + return maxConcurrentTasks; + } +}
