This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch context-value-scoped-value-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f6918961fcbced447beea3f41fcb9949daba598d
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri Jan 9 11:10:22 2026 +0100

    Refactor SedaConsumer to use template method pattern for extensibility
    
    Extract template method hooks in SedaConsumer to allow subclasses to
    customize polling behavior without duplicating the entire doRun() loop:
    
    - beforePoll(): Called before polling, returns true to proceed or false
      to skip this iteration. Allows acquiring resources like permits.
    
    - afterPollEmpty(): Called when poll returns no message. Allows
      releasing resources.
    
    - processPolledExchange(Exchange): Processes the polled exchange.
      Default is inline processing; can be overridden to dispatch to
      another thread.
    
    Also made these methods protected for subclass access:
    - createExecutor(int poolSize): Creates the executor service
    - setupTasks(): Sets up thread pool and tasks
    - shutdownExecutor(): Shuts down executors
    - isShutdownPending()/setShutdownPending(): Access shutdown state
    - pollTimeout field: Made protected
    
    ThreadPerTaskSedaConsumer now simply overrides these hooks instead of
    duplicating the entire polling loop, reducing code from 223 to 158 lines
    and improving maintainability.
---
 .../apache/camel/component/seda/SedaConsumer.java  | 100 ++++++++++---
 .../component/seda/ThreadPerTaskSedaConsumer.java  | 157 +++++++++++++++++++++
 2 files changed, 241 insertions(+), 16 deletions(-)

diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 6efe2ef8af08..845fd9045513 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -53,13 +53,25 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
     private volatile boolean shutdownPending;
     private volatile boolean forceShutdown;
     private ExecutorService executor;
-    private final int pollTimeout;
+    protected final int pollTimeout;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.pollTimeout = endpoint.getPollTimeout();
     }
 
+    protected int getPollTimeout() {
+        return pollTimeout;
+    }
+
+    protected boolean isShutdownPending() {
+        return shutdownPending;
+    }
+
+    protected void setShutdownPending(boolean shutdownPending) {
+        this.shutdownPending = shutdownPending;
+    }
+
     @Override
     public SedaEndpoint getEndpoint() {
         return (SedaEndpoint) super.getEndpoint();
@@ -174,6 +186,11 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
 
             Exchange exchange = null;
             try {
+                // hook for subclasses to prepare before polling (e.g., 
acquire permits)
+                if (!beforePoll()) {
+                    continue;
+                }
+
                 // use the end user configured poll timeout
                 exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
                 if (LOG.isTraceEnabled()) {
@@ -182,20 +199,20 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
                 }
                 if (exchange != null) {
                     try {
-                        final Exchange original = exchange;
-                        // prepare the exchange before sending to consumer
-                        final Exchange prepared = prepareExchange(exchange);
-                        // callback to be executed when sending to consumer 
and processing is done
-                        AsyncCallback callback = doneSync -> 
onProcessingDone(original, prepared);
-                        // process the exchange
-                        sendToConsumers(prepared, callback);
+                        // process the exchange (subclasses can override to 
dispatch to another thread)
+                        processPolledExchange(exchange);
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
                     }
-                } else if (shutdownPending && queue.isEmpty()) {
-                    LOG.trace("Shutdown is pending, so this consumer thread is 
breaking out because the task queue is empty.");
-                    // we want to shutdown so break out if there queue is empty
-                    break;
+                } else {
+                    // hook for subclasses to cleanup after empty poll (e.g., 
release permits)
+                    afterPollEmpty();
+                    if (shutdownPending && queue.isEmpty()) {
+                        LOG.trace(
+                                "Shutdown is pending, so this consumer thread 
is breaking out because the task queue is empty.");
+                        // we want to shutdown so break out if there queue is 
empty
+                        break;
+                    }
                 }
             } catch (InterruptedException e) {
                 LOG.debug("Sleep interrupted, are we stopping? {}", 
isStopping() || isStopped());
@@ -210,6 +227,42 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         }
     }
 
+    /**
+     * Hook called before polling the queue. Subclasses can override to 
acquire resources (e.g., permits).
+     *
+     * @return                      true to proceed with polling, false to 
skip this poll iteration
+     * @throws InterruptedException if interrupted while acquiring resources
+     */
+    protected boolean beforePoll() throws InterruptedException {
+        return true;
+    }
+
+    /**
+     * Hook called when poll returned no exchange. Subclasses can override to 
release resources (e.g., permits).
+     */
+    protected void afterPollEmpty() {
+        // nothing by default
+    }
+
+    /**
+     * Process a polled exchange. Subclasses can override to dispatch to 
another thread.
+     *
+     * @param exchange the exchange to process
+     */
+    protected void processPolledExchange(Exchange exchange) {
+        final Exchange original = exchange;
+        // prepare the exchange before sending to consumer
+        final Exchange prepared = prepareExchange(exchange);
+        // callback to be executed when sending to consumer and processing is 
done
+        AsyncCallback callback = doneSync -> onProcessingDone(original, 
prepared);
+        // process the exchange
+        try {
+            sendToConsumers(prepared, callback);
+        } catch (Exception e) {
+            getExceptionHandler().handleException("Error processing exchange", 
exchange, e);
+        }
+    }
+
     /**
      * Strategy to invoke when the exchange is done being processed.
      * <p/>
@@ -332,23 +385,38 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         shutdownExecutor();
     }
 
-    private void shutdownExecutor() {
+    /**
+     * Shuts down the executor service.
+     */
+    protected void shutdownExecutor() {
         if (executor != null) {
             
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
             executor = null;
         }
     }
 
+    /**
+     * Creates the executor service used for consumer threads.
+     * <p/>
+     * Subclasses can override this method to provide a different executor, 
such as one using virtual threads.
+     *
+     * @param  poolSize the number of concurrent consumers
+     * @return          the executor service
+     */
+    protected ExecutorService createExecutor(int poolSize) {
+        return getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newFixedThreadPool(this, getEndpoint().getEndpointUri(), 
poolSize);
+    }
+
     /**
      * Setup the thread pool and ensures tasks gets executed (if needed)
      */
-    private void setupTasks() {
+    protected void setupTasks() {
         int poolSize = getEndpoint().getConcurrentConsumers();
 
         // create thread pool if needed
         if (executor == null) {
-            executor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
-                    getEndpoint().getEndpointUri(), poolSize);
+            executor = createExecutor(poolSize);
         }
 
         // submit needed number of tasks
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
new file mode 100644
index 000000000000..ff879ac75dac
--- /dev/null
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SEDA consumer that spawns a new thread/task for each message instead of 
using a fixed pool of long-running consumer
+ * threads.
+ * <p>
+ * This consumer model is optimized for virtual threads (JDK 21+) where 
creating threads is very cheap, but it also
+ * works with platform threads. The key differences from {@link SedaConsumer} 
are:
+ * <ul>
+ * <li>Uses a cached thread pool instead of a fixed pool</li>
+ * <li>A single coordinator thread polls the queue</li>
+ * <li>Each message is processed in its own task/thread</li>
+ * <li>The concurrentConsumers setting becomes a concurrency limit (0 = 
unlimited)</li>
+ * </ul>
+ * <p>
+ * When virtual threads are enabled via {@code 
camel.threads.virtual.enabled=true}, the cached thread pool will use
+ * {@code Executors.newThreadPerTaskExecutor()}, providing optimal scaling for 
I/O-bound workloads.
+ */
+public class ThreadPerTaskSedaConsumer extends SedaConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ThreadPerTaskSedaConsumer.class);
+
+    private final int maxConcurrentTasks;
+    private final LongAdder activeTasks = new LongAdder();
+
+    private volatile ExecutorService taskExecutor;
+    private volatile Semaphore concurrencyLimiter;
+
+    public ThreadPerTaskSedaConsumer(SedaEndpoint endpoint, Processor 
processor) {
+        super(endpoint, processor);
+        // Use concurrentConsumers as the max concurrent tasks limit
+        // 0 means unlimited (the most common case for virtual threads)
+        this.maxConcurrentTasks = endpoint.getConcurrentConsumers();
+    }
+
+    @Override
+    protected ExecutorService createExecutor(int poolSize) {
+        // Create a single-thread executor for the coordinator
+        // The actual work is done by taskExecutor
+        return getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newSingleThreadExecutor(this, getEndpoint().getEndpointUri() 
+ "-coordinator");
+    }
+
+    @Override
+    protected void setupTasks() {
+        // Create task executor - uses virtual threads when enabled
+        taskExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newCachedThreadPool(this, getEndpoint().getEndpointUri() + 
"-task");
+
+        // Create concurrency limiter if max is specified and > 0
+        if (maxConcurrentTasks > 0) {
+            concurrencyLimiter = new Semaphore(maxConcurrentTasks);
+            LOG.debug("Using concurrency limit of {} for thread-per-task 
consumer", maxConcurrentTasks);
+        }
+
+        // Call parent to create the coordinator executor and start it
+        super.setupTasks();
+
+        LOG.info("Started thread-per-task SEDA consumer for {} 
(maxConcurrent={})",
+                getEndpoint().getEndpointUri(), maxConcurrentTasks > 0 ? 
maxConcurrentTasks : "unlimited");
+    }
+
+    @Override
+    protected void shutdownExecutor() {
+        super.shutdownExecutor();
+        if (taskExecutor != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(taskExecutor);
+            taskExecutor = null;
+        }
+    }
+
+    @Override
+    protected boolean beforePoll() throws InterruptedException {
+        // Acquire permit if using concurrency limiter (blocks if at limit)
+        if (concurrencyLimiter != null) {
+            return concurrencyLimiter.tryAcquire(pollTimeout, 
TimeUnit.MILLISECONDS);
+        }
+        return true;
+    }
+
+    @Override
+    protected void afterPollEmpty() {
+        // Release permit if we acquired one
+        if (concurrencyLimiter != null) {
+            concurrencyLimiter.release();
+        }
+    }
+
+    @Override
+    protected void processPolledExchange(Exchange exchange) {
+        // Dispatch to task executor for processing
+        taskExecutor.execute(() -> {
+            activeTasks.increment();
+            try {
+                // Prepare the exchange
+                Exchange prepared = prepareExchange(exchange);
+
+                // Process asynchronously
+                AsyncCallback callback = doneSync -> {
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error 
processing exchange", exchange,
+                                exchange.getException());
+                    }
+                };
+                sendToConsumers(prepared, callback);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+            } finally {
+                activeTasks.decrement();
+                if (concurrencyLimiter != null) {
+                    concurrencyLimiter.release();
+                }
+            }
+        });
+    }
+
+    /**
+     * Returns the current number of active processing tasks.
+     */
+    public long getActiveTaskCount() {
+        return activeTasks.sum();
+    }
+
+    /**
+     * Returns the maximum concurrent tasks allowed (0 means unlimited).
+     */
+    public int getMaxConcurrentTasks() {
+        return maxConcurrentTasks;
+    }
+}

Reply via email to