Hi,

Is it possible to check with the latest 3.19.0 release to see if it works?

On Fri, Oct 21, 2022 at 1:35 AM hakim hejam <hakim.he...@gmail.com> wrote:

> Hello,
> We notice under some circumstances of heavy load and request (more the 1000
> requests at the same time), threads used by Camel are still in a waiting
> state and block the caller thread. This behavior occurs when using
> multicast and split, but only when we have configured a timeout on our
> routes.
> We reproduce it in a simple test as below, we use the 3.16.0 version with
> openjdk 11
> could you help with that?
> Thank you
>
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.FluentProducerTemplate;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
> import org.apache.camel.processor.aggregate.StringAggregationStrategy;
> import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
>
> import java.util.concurrent.ThreadLocalRandom;
> import java.util.concurrent.atomic.AtomicLong;
> import java.util.stream.IntStream;
>
> public class CamelTester {
>
>     private static final AtomicLong START_TIME = new AtomicLong();
>
>     public static void main(String[] args) throws Exception {
>         CamelContext camelContext = startCamel();
>         FluentProducerTemplate fluentProducerTemplate =
> DefaultFluentProducerTemplate
>                 .on(camelContext)
>                 .withTemplateCustomizer(
>                         template -> {
>                             template.setMaximumCacheSize(100);
>                         }
>                 );
>
>         IntStream.range(1, 2000).forEach(i -> {
>             new Thread(() -> {
>                 fluentProducerTemplate
>                         .withBody("message-" + i)
>                         .to("direct:start")
>                         .send();
>             }).start();
>         });
>
>         Thread.sleep(24 * 60 * 60 * 1000);
>     }
>
>     private static CamelContext startCamel() throws Exception {
>         log("start Camel Context...");
>         CamelContext camelContext = new DefaultCamelContext();
>         camelContext.addRoutes(createBasicRoute(camelContext));
>         camelContext.start();
>         return camelContext;
>     }
>
>     static RouteBuilder createBasicRoute(CamelContext camelContext) {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("direct:start")
>                         .multicast().parallelProcessing()
>                             .aggregationStrategy(new
> StringAggregationStrategy())
>                             .timeout(10000)
>                             .to("direct:a")
>                             .to("direct:b")
>                             .to("direct:c")
>                             .to("direct:def")
>                         .end()
>                         .to("file:c:/tmp/outbox?charset=iso-8859-1");
>
>                 from("direct:def").routeId("def")
>
> .split(body().tokenize("message-")).parallelProcessing()
>                             .aggregationStrategy(new
> StringAggregationStrategy())
>                             .timeout(10000)
>                             .to("direct:d")
>                             .to("direct:e")
>                             .to("direct:f")
>                         .end();
>
>                 from("direct:a").routeId("a")
>                         .process(new CustomProcessor("route-A"));
>                 from("direct:b").routeId("b")
>                         .process(new CustomProcessor("route-B"));
>                 from("direct:c").routeId("c")
>                         .process(new CustomProcessor("route-C"));
>                 from("direct:d").routeId("d")
>                         .process(new CustomProcessor("route-D"));
>                 from("direct:e").routeId("e")
>                         .process(new CustomProcessor("route-E"));
>                 from("direct:f").routeId("f")
>                         .process(new CustomProcessor("route-F"));
>
>             }
>         };
>     }
>     public static void log(String msg) {
>         long now = System.currentTimeMillis();
>         START_TIME.compareAndSet(0, now);
>         long elapsed = now - START_TIME.get();
>         String name = Thread.currentThread().getName();
>         System.out.format("%2$-4s %1$-26s    %3$s\n", name, elapsed, msg);
>     }
>
>     static private class CustomProcessor implements Processor {
>         String routeId;
>
>         public CustomProcessor(String route) {
>             this.routeId = route;
>         }
>
>         @Override
>         public void process(Exchange exchange) throws Exception {
>             log("Start processing for the route " + routeId + " " +
> exchange.getMessage().getBody(String.class));
>             int waitingTime = ThreadLocalRandom.current().nextInt(1, 2);
>             Thread.sleep(waitingTime * 1000);
>             String message = exchange.getMessage().getBody(String.class);
>             message += "-" + routeId + "-";
>             exchange.getMessage().setBody(message, String.class);
>             log("End processing for the route " + routeId + " after "
> + waitingTime + "s " + message);
>         }
>     }
>
>     static private class CostumAggregationStrategy extends
> UseLatestAggregationStrategy {
>         public void timeout(Exchange oldExchange, int index, int
> total, long timeout) {
>             System.out.println("timeout...");
>         }
>     }
> }
>

Reply via email to