This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch context-value-scoped-value-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9612ea82d9ffbd89dddb88cf74363632be4c6ad4 Author: Guillaume Nodet <[email protected]> AuthorDate: Mon Jan 12 13:34:55 2026 +0100 Add virtualThreadPerTask option to SEDA endpoint Add a new 'virtualThreadPerTask' option to SedaEndpoint that enables the ThreadPerTaskSedaConsumer. When enabled, spawns a new thread for each message instead of using a fixed pool of consumer threads. This model is optimized for virtual threads (JDK 21+) where thread creation is very cheap, making it ideal for I/O-bound workloads. The concurrentConsumers option becomes a limit on max concurrent tasks (0 means unlimited). Changes: - Add virtualThreadPerTask property to SedaEndpoint with getter/setter - Update createNewConsumer() to return ThreadPerTaskSedaConsumer when virtualThreadPerTask is enabled - Update VirtualThreadsLoadTest to support testing with the new mode - Add ThreadPerTaskSedaConsumerTest for unit testing the feature - Regenerate endpoint configurers and metadata files --- .../component/seda/SedaEndpointConfigurer.java | 6 ++ .../component/seda/SedaEndpointUriFactory.java | 3 +- .../org/apache/camel/component/seda/seda.json | 21 +++--- .../apache/camel/component/seda/SedaEndpoint.java | 24 +++++++ .../component/stub/StubEndpointUriFactory.java | 3 +- .../org/apache/camel/component/stub/stub.json | 21 +++--- .../seda/ThreadPerTaskSedaConsumerTest.java | 82 ++++++++++++++++++++++ .../camel/processor/VirtualThreadsLoadTest.java | 22 +++++- 8 files changed, 158 insertions(+), 24 deletions(-) diff --git a/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointConfigurer.java b/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointConfigurer.java index ec2b70431cff..9d762e51bf6c 100644 --- a/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointConfigurer.java +++ b/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointConfigurer.java @@ -56,6 +56,8 @@ public class SedaEndpointConfigurer extends PropertyConfigurerSupport implements case "queue": target.setQueue(property(camelContext, java.util.concurrent.BlockingQueue.class, value)); return true; case "size": target.setSize(property(camelContext, int.class, value)); return true; case "timeout": target.setTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true; + case "virtualthreadpertask": + case "virtualThreadPerTask": target.setVirtualThreadPerTask(property(camelContext, boolean.class, value)); return true; case "waitfortasktocomplete": case "waitForTaskToComplete": target.setWaitForTaskToComplete(property(camelContext, org.apache.camel.WaitForTaskToComplete.class, value)); return true; default: return false; @@ -98,6 +100,8 @@ public class SedaEndpointConfigurer extends PropertyConfigurerSupport implements case "queue": return java.util.concurrent.BlockingQueue.class; case "size": return int.class; case "timeout": return long.class; + case "virtualthreadpertask": + case "virtualThreadPerTask": return boolean.class; case "waitfortasktocomplete": case "waitForTaskToComplete": return org.apache.camel.WaitForTaskToComplete.class; default: return null; @@ -141,6 +145,8 @@ public class SedaEndpointConfigurer extends PropertyConfigurerSupport implements case "queue": return target.getQueue(); case "size": return target.getSize(); case "timeout": return target.getTimeout(); + case "virtualthreadpertask": + case "virtualThreadPerTask": return target.isVirtualThreadPerTask(); case "waitfortasktocomplete": case "waitForTaskToComplete": return target.getWaitForTaskToComplete(); default: return null; diff --git a/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointUriFactory.java b/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointUriFactory.java index a1fbc3d27d6a..de0217a87588 100644 --- a/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointUriFactory.java +++ b/components/camel-seda/src/generated/java/org/apache/camel/component/seda/SedaEndpointUriFactory.java @@ -23,7 +23,7 @@ public class SedaEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(20); + Set<String> props = new HashSet<>(21); props.add("blockWhenFull"); props.add("bridgeErrorHandler"); props.add("browseLimit"); @@ -43,6 +43,7 @@ public class SedaEndpointUriFactory extends org.apache.camel.support.component.E props.add("queue"); props.add("size"); props.add("timeout"); + props.add("virtualThreadPerTask"); props.add("waitForTaskToComplete"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); diff --git a/components/camel-seda/src/generated/resources/META-INF/org/apache/camel/component/seda/seda.json b/components/camel-seda/src/generated/resources/META-INF/org/apache/camel/component/seda/seda.json index 4e8e5b3c7d66..fcafafdfaae0 100644 --- a/components/camel-seda/src/generated/resources/META-INF/org/apache/camel/component/seda/seda.json +++ b/components/camel-seda/src/generated/resources/META-INF/org/apache/camel/component/seda/seda.json @@ -46,15 +46,16 @@ "multipleConsumers": { "index": 7, "kind": "parameter", "displayName": "Multiple Consumers", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging. That is, you can send a message to the SEDA queue and have each [...] "pollTimeout": { "index": 8, "kind": "parameter", "displayName": "Poll Timeout", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "The timeout (in milliseconds) used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue running. Setting a lower value allows the consumer to react mor [...] "purgeWhenStopping": { "index": 9, "kind": "parameter", "displayName": "Purge When Stopping", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to purge the task queue when stopping the consumer\/route. This allows to stop faster, as any pending messages on the queue is discarded." }, - "timeout": { "index": 10, "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "Timeout before a SEDA producer will stop waiting for an asynchronous task to complete. You can disable timeout by using 0 or a negative value." }, - "waitForTaskToComplete": { "index": 11, "kind": "parameter", "displayName": "Wait For Task To Complete", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.WaitForTaskToComplete", "enum": [ "Never", "IfReplyExpected", "Always" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "IfReplyExpected", "description": "Option to specify whether the caller should wait for the async task to complete or not before [...] - "blockWhenFull": { "index": 12, "kind": "parameter", "displayName": "Block When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. By default, an exception will be thrown stating that the queu [...] - "discardIfNoConsumers": { "index": 13, "kind": "parameter", "displayName": "Discard If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. Only one of the options disca [...] - "discardWhenFull": { "index": 14, "kind": "parameter", "displayName": "Discard When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be thrown stating that the queue is full. By enabling this option, the [...] - "failIfNoConsumers": { "index": 15, "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsum [...] - "lazyStartProducer": { "index": 16, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "offerTimeout": { "index": 17, "kind": "parameter", "displayName": "Offer Timeout", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Offer timeout can be added to the block case when queue is full. You can disable timeout by using 0 or a negative value." }, - "browseLimit": { "index": 18, "kind": "parameter", "displayName": "Browse Limit", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "description": "Maximum number of messages to keep in memory available for browsing. Use 0 for unlimited." }, - "queue": { "index": 19, "kind": "parameter", "displayName": "Queue", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.BlockingQueue<org.apache.camel.Exchange>", "deprecated": false, "autowired": false, "secret": false, "description": "Define the queue instance which will be used by the endpoint" } + "virtualThreadPerTask": { "index": 10, "kind": "parameter", "displayName": "Virtual Thread Per Task", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled, spawns a new virtual thread for each message instead of using a fixed pool of consumer threads. This model is optimized for virtual threads (JDK 21) and I [...] + "timeout": { "index": 11, "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "Timeout before a SEDA producer will stop waiting for an asynchronous task to complete. You can disable timeout by using 0 or a negative value." }, + "waitForTaskToComplete": { "index": 12, "kind": "parameter", "displayName": "Wait For Task To Complete", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.WaitForTaskToComplete", "enum": [ "Never", "IfReplyExpected", "Always" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "IfReplyExpected", "description": "Option to specify whether the caller should wait for the async task to complete or not before [...] + "blockWhenFull": { "index": 13, "kind": "parameter", "displayName": "Block When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. By default, an exception will be thrown stating that the queu [...] + "discardIfNoConsumers": { "index": 14, "kind": "parameter", "displayName": "Discard If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. Only one of the options disca [...] + "discardWhenFull": { "index": 15, "kind": "parameter", "displayName": "Discard When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be thrown stating that the queue is full. By enabling this option, the [...] + "failIfNoConsumers": { "index": 16, "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsum [...] + "lazyStartProducer": { "index": 17, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "offerTimeout": { "index": 18, "kind": "parameter", "displayName": "Offer Timeout", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Offer timeout can be added to the block case when queue is full. You can disable timeout by using 0 or a negative value." }, + "browseLimit": { "index": 19, "kind": "parameter", "displayName": "Browse Limit", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "description": "Maximum number of messages to keep in memory available for browsing. Use 0 for unlimited." }, + "queue": { "index": 20, "kind": "parameter", "displayName": "Queue", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.BlockingQueue<org.apache.camel.Exchange>", "deprecated": false, "autowired": false, "secret": false, "description": "Define the queue instance which will be used by the endpoint" } } } diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 5e78f3680329..5385748e8fb7 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -99,6 +99,12 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow description = "The timeout (in milliseconds) used when polling. When a timeout occurs, the consumer can check whether it is" + " allowed to continue running. Setting a lower value allows the consumer to react more quickly upon shutdown.") private int pollTimeout = 1000; + @UriParam(label = "consumer,advanced", + description = "If enabled, spawns a new virtual thread for each message instead of using a fixed pool of consumer threads. " + + "This model is optimized for virtual threads (JDK 21+) and I/O-bound workloads where creating threads is cheap. " + + "The concurrentConsumers option becomes a limit on max concurrent tasks (0 = unlimited). " + + "Requires virtual threads to be enabled via camel.threads.virtual.enabled=true.") + private boolean virtualThreadPerTask; @UriParam(label = "producer", defaultValue = "IfReplyExpected", description = "Option to specify whether the caller should wait for the async task to complete or not before continuing. The" @@ -203,6 +209,9 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow } protected SedaConsumer createNewConsumer(Processor processor) { + if (virtualThreadPerTask) { + return new ThreadPerTaskSedaConsumer(this, processor); + } return new SedaConsumer(this, processor); } @@ -526,6 +535,21 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow this.pollTimeout = pollTimeout; } + @ManagedAttribute + public boolean isVirtualThreadPerTask() { + return virtualThreadPerTask; + } + + /** + * If enabled, spawns a new virtual thread for each message instead of using a fixed pool of consumer threads. This + * model is optimized for virtual threads (JDK 21+) and I/O-bound workloads where creating threads is cheap. The + * concurrentConsumers option becomes a limit on max concurrent tasks (0 = unlimited). Requires virtual threads to + * be enabled via camel.threads.virtual.enabled=true. + */ + public void setVirtualThreadPerTask(boolean virtualThreadPerTask) { + this.virtualThreadPerTask = virtualThreadPerTask; + } + @ManagedAttribute public boolean isPurgeWhenStopping() { return purgeWhenStopping; diff --git a/components/camel-stub/src/generated/java/org/apache/camel/component/stub/StubEndpointUriFactory.java b/components/camel-stub/src/generated/java/org/apache/camel/component/stub/StubEndpointUriFactory.java index 60dc193c0f11..0f3b8b320f1e 100644 --- a/components/camel-stub/src/generated/java/org/apache/camel/component/stub/StubEndpointUriFactory.java +++ b/components/camel-stub/src/generated/java/org/apache/camel/component/stub/StubEndpointUriFactory.java @@ -23,7 +23,7 @@ public class StubEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(20); + Set<String> props = new HashSet<>(21); props.add("blockWhenFull"); props.add("bridgeErrorHandler"); props.add("browseLimit"); @@ -43,6 +43,7 @@ public class StubEndpointUriFactory extends org.apache.camel.support.component.E props.add("queue"); props.add("size"); props.add("timeout"); + props.add("virtualThreadPerTask"); props.add("waitForTaskToComplete"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); diff --git a/components/camel-stub/src/generated/resources/META-INF/org/apache/camel/component/stub/stub.json b/components/camel-stub/src/generated/resources/META-INF/org/apache/camel/component/stub/stub.json index af1a3239779b..6d33364a3a0c 100644 --- a/components/camel-stub/src/generated/resources/META-INF/org/apache/camel/component/stub/stub.json +++ b/components/camel-stub/src/generated/resources/META-INF/org/apache/camel/component/stub/stub.json @@ -48,15 +48,16 @@ "multipleConsumers": { "index": 7, "kind": "parameter", "displayName": "Multiple Consumers", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging. That is, you can send a message to the SEDA queue and have each [...] "pollTimeout": { "index": 8, "kind": "parameter", "displayName": "Poll Timeout", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "The timeout (in milliseconds) used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue running. Setting a lower value allows the consumer to react mor [...] "purgeWhenStopping": { "index": 9, "kind": "parameter", "displayName": "Purge When Stopping", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to purge the task queue when stopping the consumer\/route. This allows to stop faster, as any pending messages on the queue is discarded." }, - "timeout": { "index": 10, "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "Timeout before a SEDA producer will stop waiting for an asynchronous task to complete. You can disable timeout by using 0 or a negative value." }, - "waitForTaskToComplete": { "index": 11, "kind": "parameter", "displayName": "Wait For Task To Complete", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.WaitForTaskToComplete", "enum": [ "Never", "IfReplyExpected", "Always" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "IfReplyExpected", "description": "Option to specify whether the caller should wait for the async task to complete or not before [...] - "blockWhenFull": { "index": 12, "kind": "parameter", "displayName": "Block When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. By default, an exception will be thrown stating that the queu [...] - "discardIfNoConsumers": { "index": 13, "kind": "parameter", "displayName": "Discard If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. Only one of the options disca [...] - "discardWhenFull": { "index": 14, "kind": "parameter", "displayName": "Discard When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be thrown stating that the queue is full. By enabling this option, the [...] - "failIfNoConsumers": { "index": 15, "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsum [...] - "lazyStartProducer": { "index": 16, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "offerTimeout": { "index": 17, "kind": "parameter", "displayName": "Offer Timeout", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Offer timeout can be added to the block case when queue is full. You can disable timeout by using 0 or a negative value." }, - "browseLimit": { "index": 18, "kind": "parameter", "displayName": "Browse Limit", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "description": "Maximum number of messages to keep in memory available for browsing. Use 0 for unlimited." }, - "queue": { "index": 19, "kind": "parameter", "displayName": "Queue", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.BlockingQueue<org.apache.camel.Exchange>", "deprecated": false, "autowired": false, "secret": false, "description": "Define the queue instance which will be used by the endpoint" } + "virtualThreadPerTask": { "index": 10, "kind": "parameter", "displayName": "Virtual Thread Per Task", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled, spawns a new virtual thread for each message instead of using a fixed pool of consumer threads. This model is optimized for virtual threads (JDK 21) and I [...] + "timeout": { "index": 11, "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "Timeout before a SEDA producer will stop waiting for an asynchronous task to complete. You can disable timeout by using 0 or a negative value." }, + "waitForTaskToComplete": { "index": 12, "kind": "parameter", "displayName": "Wait For Task To Complete", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.WaitForTaskToComplete", "enum": [ "Never", "IfReplyExpected", "Always" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "IfReplyExpected", "description": "Option to specify whether the caller should wait for the async task to complete or not before [...] + "blockWhenFull": { "index": 13, "kind": "parameter", "displayName": "Block When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer exhausted. By default, an exception will be thrown stating that the queu [...] + "discardIfNoConsumers": { "index": 14, "kind": "parameter", "displayName": "Discard If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. Only one of the options disca [...] + "discardWhenFull": { "index": 15, "kind": "parameter", "displayName": "Discard When Full", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be thrown stating that the queue is full. By enabling this option, the [...] + "failIfNoConsumers": { "index": 16, "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. Only one of the options discardIfNoConsumers and failIfNoConsum [...] + "lazyStartProducer": { "index": 17, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "offerTimeout": { "index": 18, "kind": "parameter", "displayName": "Offer Timeout", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Offer timeout can be added to the block case when queue is full. You can disable timeout by using 0 or a negative value." }, + "browseLimit": { "index": 19, "kind": "parameter", "displayName": "Browse Limit", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "description": "Maximum number of messages to keep in memory available for browsing. Use 0 for unlimited." }, + "queue": { "index": 20, "kind": "parameter", "displayName": "Queue", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.BlockingQueue<org.apache.camel.Exchange>", "deprecated": false, "autowired": false, "secret": false, "description": "Define the queue instance which will be used by the endpoint" } } } diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java new file mode 100644 index 000000000000..f1725fdc6377 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/ThreadPerTaskSedaConsumerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +/** + * Test for the virtualThreadPerTask mode of SEDA consumer + */ +public class ThreadPerTaskSedaConsumerTest extends ContextTestSupport { + + @Test + public void testVirtualThreadPerTask() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + + for (int i = 0; i < 10; i++) { + template.sendBody("seda:test?virtualThreadPerTask=true", "Message " + i); + } + + mock.assertIsSatisfied(); + } + + @Test + public void testVirtualThreadPerTaskWithConcurrencyLimit() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:limited"); + mock.expectedMessageCount(5); + + for (int i = 0; i < 5; i++) { + template.sendBody("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2", "Message " + i); + } + + mock.assertIsSatisfied(); + } + + @Test + public void testVirtualThreadPerTaskHighThroughput() throws Exception { + int messageCount = 100; + MockEndpoint mock = getMockEndpoint("mock:throughput"); + mock.expectedMessageCount(messageCount); + + for (int i = 0; i < messageCount; i++) { + template.sendBody("seda:throughput?virtualThreadPerTask=true", "Message " + i); + } + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("seda:test?virtualThreadPerTask=true") + .to("mock:result"); + + from("seda:limited?virtualThreadPerTask=true&concurrentConsumers=2") + .to("mock:limited"); + + from("seda:throughput?virtualThreadPerTask=true") + .to("mock:throughput"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java index fac56aec7290..291b2b5a6e8a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java @@ -49,6 +49,12 @@ import org.slf4j.LoggerFactory; * <pre> * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core -Dcamel.threads.virtual.enabled=true * </pre> + * <p> + * Run with virtual threads and thread-per-task mode (optimal for virtual threads): + * + * <pre> + * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core -Dcamel.threads.virtual.enabled=true -Dloadtest.virtualThreadPerTask=true + * </pre> */ @Disabled("Manual load test - run explicitly for benchmarking") public class VirtualThreadsLoadTest extends ContextTestSupport { @@ -61,6 +67,9 @@ public class VirtualThreadsLoadTest extends ContextTestSupport { private static final int CONCURRENT_PRODUCERS = Integer.getInteger("loadtest.producers", 50); private static final int CONCURRENT_CONSUMERS = Integer.getInteger("loadtest.consumers", 100); private static final int SIMULATED_IO_DELAY_MS = Integer.getInteger("loadtest.delay", 5); + // When true, uses virtualThreadPerTask mode which spawns a new thread per message + // This is optimal for virtual threads where thread creation is cheap + private static final boolean VIRTUAL_THREAD_PER_TASK = Boolean.getBoolean("loadtest.virtualThreadPerTask"); private final LongAdder processedCount = new LongAdder(); private CountDownLatch completionLatch; @@ -82,7 +91,8 @@ public class VirtualThreadsLoadTest extends ContextTestSupport { System.out.println("Starting load test: " + TOTAL_MESSAGES + " messages, " + CONCURRENT_PRODUCERS + " producers, " + CONCURRENT_CONSUMERS + " consumers, " - + SIMULATED_IO_DELAY_MS + "ms I/O delay"); + + SIMULATED_IO_DELAY_MS + "ms I/O delay" + + (VIRTUAL_THREAD_PER_TASK ? ", virtualThreadPerTask=true" : "")); StopWatch watch = new StopWatch(); @@ -128,6 +138,7 @@ public class VirtualThreadsLoadTest extends ContextTestSupport { System.out.println("Throughput: " + String.format("%.2f", throughput) + " msg/sec"); System.out.println("Average latency: " + String.format("%.2f", avgLatency) + " ms/msg"); System.out.println("Virtual threads: " + System.getProperty("camel.threads.virtual.enabled", "false")); + System.out.println("Thread-per-task mode: " + VIRTUAL_THREAD_PER_TASK); System.out.println(); } @@ -138,7 +149,14 @@ public class VirtualThreadsLoadTest extends ContextTestSupport { public void configure() { // Route with concurrent consumers and simulated I/O delay // Use larger queue size to avoid blocking - from("seda:start?concurrentConsumers=" + CONCURRENT_CONSUMERS + "&size=" + (TOTAL_MESSAGES + 1000)) + String sedaOptions = "concurrentConsumers=" + CONCURRENT_CONSUMERS + + "&size=" + (TOTAL_MESSAGES + 1000); + if (VIRTUAL_THREAD_PER_TASK) { + // Use thread-per-task mode - optimal for virtual threads + // concurrentConsumers becomes a concurrency limit + sedaOptions += "&virtualThreadPerTask=true"; + } + from("seda:start?" + sedaOptions) .routeId("loadTestRoute") .process(new SimulatedIOProcessor()) .process(exchange -> {
