Hello, We notice under some circumstances of heavy load and request (more the 1000 requests at the same time), threads used by Camel are still in a waiting state and block the caller thread. This behavior occurs when using multicast and split, but only when we have configured a timeout on our routes. We reproduce it in a simple test as below, we use the 3.16.0 version with openjdk 11 could you help with that? Thank you
import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.FluentProducerTemplate; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.engine.DefaultFluentProducerTemplate; import org.apache.camel.processor.aggregate.StringAggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; public class CamelTester { private static final AtomicLong START_TIME = new AtomicLong(); public static void main(String[] args) throws Exception { CamelContext camelContext = startCamel(); FluentProducerTemplate fluentProducerTemplate = DefaultFluentProducerTemplate .on(camelContext) .withTemplateCustomizer( template -> { template.setMaximumCacheSize(100); } ); IntStream.range(1, 2000).forEach(i -> { new Thread(() -> { fluentProducerTemplate .withBody("message-" + i) .to("direct:start") .send(); }).start(); }); Thread.sleep(24 * 60 * 60 * 1000); } private static CamelContext startCamel() throws Exception { log("start Camel Context..."); CamelContext camelContext = new DefaultCamelContext(); camelContext.addRoutes(createBasicRoute(camelContext)); camelContext.start(); return camelContext; } static RouteBuilder createBasicRoute(CamelContext camelContext) { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .multicast().parallelProcessing() .aggregationStrategy(new StringAggregationStrategy()) .timeout(10000) .to("direct:a") .to("direct:b") .to("direct:c") .to("direct:def") .end() .to("file:c:/tmp/outbox?charset=iso-8859-1"); from("direct:def").routeId("def") .split(body().tokenize("message-")).parallelProcessing() .aggregationStrategy(new StringAggregationStrategy()) .timeout(10000) .to("direct:d") .to("direct:e") .to("direct:f") .end(); from("direct:a").routeId("a") .process(new CustomProcessor("route-A")); from("direct:b").routeId("b") .process(new CustomProcessor("route-B")); from("direct:c").routeId("c") .process(new CustomProcessor("route-C")); from("direct:d").routeId("d") .process(new CustomProcessor("route-D")); from("direct:e").routeId("e") .process(new CustomProcessor("route-E")); from("direct:f").routeId("f") .process(new CustomProcessor("route-F")); } }; } public static void log(String msg) { long now = System.currentTimeMillis(); START_TIME.compareAndSet(0, now); long elapsed = now - START_TIME.get(); String name = Thread.currentThread().getName(); System.out.format("%2$-4s %1$-26s %3$s\n", name, elapsed, msg); } static private class CustomProcessor implements Processor { String routeId; public CustomProcessor(String route) { this.routeId = route; } @Override public void process(Exchange exchange) throws Exception { log("Start processing for the route " + routeId + " " + exchange.getMessage().getBody(String.class)); int waitingTime = ThreadLocalRandom.current().nextInt(1, 2); Thread.sleep(waitingTime * 1000); String message = exchange.getMessage().getBody(String.class); message += "-" + routeId + "-"; exchange.getMessage().setBody(message, String.class); log("End processing for the route " + routeId + " after " + waitingTime + "s " + message); } } static private class CostumAggregationStrategy extends UseLatestAggregationStrategy { public void timeout(Exchange oldExchange, int index, int total, long timeout) { System.out.println("timeout..."); } } }