This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch context-value-scoped-value-support
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 59e652d859e4f457335e2bd455ac6d837d919172
Author: Guillaume Nodet <[email protected]>
AuthorDate: Wed Jan 7 12:17:18 2026 +0100

    Add load tests for virtual threads performance comparison
    
    Add two disabled load test classes that can be run manually to compare
    performance between platform threads and virtual threads:
    
    - VirtualThreadsLoadTest: Uses SEDA with concurrent consumers to test
      throughput with simulated I/O delays
    - VirtualThreadsWithThreadsDSLLoadTest: Uses threads() DSL to exercise
      the ContextValue/ScopedValue code paths
    
    Tests are disabled by default and configurable via system properties:
    - loadtest.messages: Number of messages to process (default: 5000)
    - loadtest.producers: Number of producer threads (default: 50)
    - loadtest.consumers: Number of concurrent consumers (default: 100)
    - loadtest.delay: Simulated I/O delay in ms (default: 5-10)
    
    Run with:
      mvn test -Dtest=VirtualThreadsLoadTest \
        -Djunit.jupiter.conditions.deactivate='org.junit.*DisabledCondition' \
        -Dcamel.threads.virtual.enabled=true
---
 .../camel/processor/VirtualThreadsLoadTest.java    | 163 +++++++++++++++++++++
 .../VirtualThreadsWithThreadsDSLLoadTest.java      | 144 ++++++++++++++++++
 2 files changed, 307 insertions(+)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java
new file mode 100644
index 000000000000..fac56aec7290
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load test to compare performance of platform threads vs virtual threads.
+ * <p>
+ * This test is disabled by default as it's meant to be run manually for 
benchmarking.
+ * <p>
+ * Run with platform threads (default):
+ *
+ * <pre>
+ * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core
+ * </pre>
+ * <p>
+ * Run with virtual threads (JDK 21+):
+ *
+ * <pre>
+ * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core 
-Dcamel.threads.virtual.enabled=true
+ * </pre>
+ */
+@Disabled("Manual load test - run explicitly for benchmarking")
+public class VirtualThreadsLoadTest extends ContextTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(VirtualThreadsLoadTest.class);
+
+    // Configuration - adjust these for your environment
+    // With 200 consumers and 5ms delay, theoretical max throughput = 200 * 
1000/5 = 40,000 msg/sec
+    private static final int TOTAL_MESSAGES = 
Integer.getInteger("loadtest.messages", 5_000);
+    private static final int CONCURRENT_PRODUCERS = 
Integer.getInteger("loadtest.producers", 50);
+    private static final int CONCURRENT_CONSUMERS = 
Integer.getInteger("loadtest.consumers", 100);
+    private static final int SIMULATED_IO_DELAY_MS = 
Integer.getInteger("loadtest.delay", 5);
+
+    private final LongAdder processedCount = new LongAdder();
+    private CountDownLatch completionLatch;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        // Log whether virtual threads are enabled
+        boolean virtualThreads = "true".equalsIgnoreCase(
+                System.getProperty("camel.threads.virtual.enabled", "false"));
+        LOG.info("Virtual threads enabled: {}", virtualThreads);
+        return context;
+    }
+
+    @Test
+    public void testHighConcurrencyWithSimulatedIO() throws Exception {
+        completionLatch = new CountDownLatch(TOTAL_MESSAGES);
+        processedCount.reset();
+
+        System.out.println("Starting load test: " + TOTAL_MESSAGES + " 
messages, "
+                           + CONCURRENT_PRODUCERS + " producers, " + 
CONCURRENT_CONSUMERS + " consumers, "
+                           + SIMULATED_IO_DELAY_MS + "ms I/O delay");
+
+        StopWatch watch = new StopWatch();
+
+        // Create producer threads - use virtual threads when available for 
producers too
+        ExecutorService producerPool;
+        try {
+            producerPool = (ExecutorService) Executors.class
+                    .getMethod("newVirtualThreadPerTaskExecutor").invoke(null);
+            System.out.println("Using virtual threads for producers");
+        } catch (Exception e) {
+            producerPool = Executors.newFixedThreadPool(CONCURRENT_PRODUCERS);
+            System.out.println("Using platform threads for producers");
+        }
+
+        for (int i = 0; i < TOTAL_MESSAGES; i++) {
+            final int msgNum = i;
+            producerPool.submit(() -> {
+                try {
+                    template.sendBody("seda:start", "Message-" + msgNum);
+                } catch (Exception e) {
+                    LOG.error("Error sending message", e);
+                }
+            });
+        }
+
+        // Wait for all messages to be processed
+        boolean completed = completionLatch.await(5, TimeUnit.MINUTES);
+
+        long elapsed = watch.taken();
+        producerPool.shutdown();
+
+        // Calculate metrics
+        long processed = processedCount.sum();
+        double throughput = (processed * 1000.0) / elapsed;
+        double avgLatency = elapsed / (double) processed;
+
+        // Use System.out for guaranteed visibility in test output
+        System.out.println();
+        System.out.println("=== Load Test Results ===");
+        System.out.println("Completed: " + (completed ? "YES" : "NO 
(timeout)"));
+        System.out.println("Messages processed: " + processed);
+        System.out.println("Total time: " + elapsed + " ms");
+        System.out.println("Throughput: " + String.format("%.2f", throughput) 
+ " msg/sec");
+        System.out.println("Average latency: " + String.format("%.2f", 
avgLatency) + " ms/msg");
+        System.out.println("Virtual threads: " + 
System.getProperty("camel.threads.virtual.enabled", "false"));
+        System.out.println();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Route with concurrent consumers and simulated I/O delay
+                // Use larger queue size to avoid blocking
+                from("seda:start?concurrentConsumers=" + CONCURRENT_CONSUMERS 
+ "&size=" + (TOTAL_MESSAGES + 1000))
+                        .routeId("loadTestRoute")
+                        .process(new SimulatedIOProcessor())
+                        .process(exchange -> {
+                            processedCount.increment();
+                            completionLatch.countDown();
+                        });
+            }
+        };
+    }
+
+    /**
+     * Processor that simulates I/O delay (e.g., database call, HTTP request). 
This is where virtual threads should show
+     * significant improvement - platform threads block during sleep, while 
virtual threads yield.
+     */
+    private static class SimulatedIOProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // Simulate blocking I/O operation
+            Thread.sleep(SIMULATED_IO_DELAY_MS);
+        }
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java
new file mode 100644
index 000000000000..3a3f4997ed44
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.StopWatch;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load test using the threads() DSL to directly exercise the thread pool 
creation which uses ContextValue/ScopedValue
+ * for the "create processor" context.
+ * <p>
+ * This test is disabled by default as it's meant to be run manually for 
benchmarking.
+ * <p>
+ * Run with platform threads (default):
+ *
+ * <pre>
+ * mvn test -Dtest=VirtualThreadsWithThreadsDSLLoadTest -pl core/camel-core
+ * </pre>
+ * <p>
+ * Run with virtual threads (JDK 21+):
+ *
+ * <pre>
+ * mvn test -Dtest=VirtualThreadsWithThreadsDSLLoadTest -pl core/camel-core 
-Dcamel.threads.virtual.enabled=true
+ * </pre>
+ */
+@Disabled("Manual load test - run explicitly for benchmarking")
+public class VirtualThreadsWithThreadsDSLLoadTest extends ContextTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(VirtualThreadsWithThreadsDSLLoadTest.class);
+
+    // Configuration - can be overridden via system properties
+    private static final int TOTAL_MESSAGES = 
Integer.getInteger("loadtest.messages", 5_000);
+    private static final int CONCURRENT_PRODUCERS = 
Integer.getInteger("loadtest.producers", 50);
+    private static final int THREAD_POOL_SIZE = 
Integer.getInteger("loadtest.poolSize", 20);
+    private static final int MAX_POOL_SIZE = 
Integer.getInteger("loadtest.maxPoolSize", 100);
+    private static final int SIMULATED_IO_DELAY_MS = 
Integer.getInteger("loadtest.delay", 10);
+
+    private final LongAdder processedCount = new LongAdder();
+    private CountDownLatch completionLatch;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        boolean virtualThreads = "true".equalsIgnoreCase(
+                System.getProperty("camel.threads.virtual.enabled", "false"));
+        LOG.info("Virtual threads enabled: {}", virtualThreads);
+        return context;
+    }
+
+    @Test
+    public void testThreadsDSLWithSimulatedIO() throws Exception {
+        completionLatch = new CountDownLatch(TOTAL_MESSAGES);
+        processedCount.reset();
+
+        LOG.info("Starting threads() DSL load test: {} messages, {} producers, 
pool {}-{}, {}ms I/O delay",
+                TOTAL_MESSAGES, CONCURRENT_PRODUCERS, THREAD_POOL_SIZE, 
MAX_POOL_SIZE, SIMULATED_IO_DELAY_MS);
+
+        StopWatch watch = new StopWatch();
+
+        ExecutorService producerPool = 
Executors.newFixedThreadPool(CONCURRENT_PRODUCERS);
+        for (int i = 0; i < TOTAL_MESSAGES; i++) {
+            final int msgNum = i;
+            producerPool.submit(() -> {
+                try {
+                    template.sendBody("direct:start", "Message-" + msgNum);
+                } catch (Exception e) {
+                    LOG.error("Error sending message", e);
+                }
+            });
+        }
+
+        boolean completed = completionLatch.await(5, TimeUnit.MINUTES);
+
+        long elapsed = watch.taken();
+        producerPool.shutdown();
+
+        long processed = processedCount.sum();
+        double throughput = (processed * 1000.0) / elapsed;
+
+        // Use System.out for guaranteed visibility in test output
+        System.out.println();
+        System.out.println("=== threads() DSL Load Test Results ===");
+        System.out.println("Completed: " + (completed ? "YES" : "NO 
(timeout)"));
+        System.out.println("Messages processed: " + processed);
+        System.out.println("Total time: " + elapsed + " ms");
+        System.out.println("Throughput: " + String.format("%.2f", throughput) 
+ " msg/sec");
+        System.out.println("Virtual threads: " + 
System.getProperty("camel.threads.virtual.enabled", "false"));
+        System.out.println();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Route using threads() DSL - this exercises ContextValue for 
createProcessor
+                from("direct:start")
+                        .routeId("threadsDSLLoadTest")
+                        .threads(THREAD_POOL_SIZE, MAX_POOL_SIZE)
+                        .threadName("loadTest")
+                        .process(new SimulatedIOProcessor())
+                        .process(exchange -> {
+                            processedCount.increment();
+                            completionLatch.countDown();
+                        });
+            }
+        };
+    }
+
+    private static class SimulatedIOProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            Thread.sleep(SIMULATED_IO_DELAY_MS);
+        }
+    }
+}

Reply via email to