Hi, Is it possible to check with the latest 3.19.0 release to see if it works?
On Fri, Oct 21, 2022 at 1:35 AM hakim hejam <hakim.he...@gmail.com> wrote: > Hello, > We notice under some circumstances of heavy load and request (more the 1000 > requests at the same time), threads used by Camel are still in a waiting > state and block the caller thread. This behavior occurs when using > multicast and split, but only when we have configured a timeout on our > routes. > We reproduce it in a simple test as below, we use the 3.16.0 version with > openjdk 11 > could you help with that? > Thank you > > import org.apache.camel.CamelContext; > import org.apache.camel.Exchange; > import org.apache.camel.FluentProducerTemplate; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultCamelContext; > import org.apache.camel.impl.engine.DefaultFluentProducerTemplate; > import org.apache.camel.processor.aggregate.StringAggregationStrategy; > import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; > > import java.util.concurrent.ThreadLocalRandom; > import java.util.concurrent.atomic.AtomicLong; > import java.util.stream.IntStream; > > public class CamelTester { > > private static final AtomicLong START_TIME = new AtomicLong(); > > public static void main(String[] args) throws Exception { > CamelContext camelContext = startCamel(); > FluentProducerTemplate fluentProducerTemplate = > DefaultFluentProducerTemplate > .on(camelContext) > .withTemplateCustomizer( > template -> { > template.setMaximumCacheSize(100); > } > ); > > IntStream.range(1, 2000).forEach(i -> { > new Thread(() -> { > fluentProducerTemplate > .withBody("message-" + i) > .to("direct:start") > .send(); > }).start(); > }); > > Thread.sleep(24 * 60 * 60 * 1000); > } > > private static CamelContext startCamel() throws Exception { > log("start Camel Context..."); > CamelContext camelContext = new DefaultCamelContext(); > camelContext.addRoutes(createBasicRoute(camelContext)); > camelContext.start(); > return camelContext; > } > > static RouteBuilder createBasicRoute(CamelContext camelContext) { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > from("direct:start") > .multicast().parallelProcessing() > .aggregationStrategy(new > StringAggregationStrategy()) > .timeout(10000) > .to("direct:a") > .to("direct:b") > .to("direct:c") > .to("direct:def") > .end() > .to("file:c:/tmp/outbox?charset=iso-8859-1"); > > from("direct:def").routeId("def") > > .split(body().tokenize("message-")).parallelProcessing() > .aggregationStrategy(new > StringAggregationStrategy()) > .timeout(10000) > .to("direct:d") > .to("direct:e") > .to("direct:f") > .end(); > > from("direct:a").routeId("a") > .process(new CustomProcessor("route-A")); > from("direct:b").routeId("b") > .process(new CustomProcessor("route-B")); > from("direct:c").routeId("c") > .process(new CustomProcessor("route-C")); > from("direct:d").routeId("d") > .process(new CustomProcessor("route-D")); > from("direct:e").routeId("e") > .process(new CustomProcessor("route-E")); > from("direct:f").routeId("f") > .process(new CustomProcessor("route-F")); > > } > }; > } > public static void log(String msg) { > long now = System.currentTimeMillis(); > START_TIME.compareAndSet(0, now); > long elapsed = now - START_TIME.get(); > String name = Thread.currentThread().getName(); > System.out.format("%2$-4s %1$-26s %3$s\n", name, elapsed, msg); > } > > static private class CustomProcessor implements Processor { > String routeId; > > public CustomProcessor(String route) { > this.routeId = route; > } > > @Override > public void process(Exchange exchange) throws Exception { > log("Start processing for the route " + routeId + " " + > exchange.getMessage().getBody(String.class)); > int waitingTime = ThreadLocalRandom.current().nextInt(1, 2); > Thread.sleep(waitingTime * 1000); > String message = exchange.getMessage().getBody(String.class); > message += "-" + routeId + "-"; > exchange.getMessage().setBody(message, String.class); > log("End processing for the route " + routeId + " after " > + waitingTime + "s " + message); > } > } > > static private class CostumAggregationStrategy extends > UseLatestAggregationStrategy { > public void timeout(Exchange oldExchange, int index, int > total, long timeout) { > System.out.println("timeout..."); > } > } > } >