Hello,
We notice under some circumstances of heavy load and request (more the 1000
requests at the same time), threads used by Camel are still in a waiting
state and block the caller thread. This behavior occurs when using
multicast and split, but only when we have configured a timeout on our
routes.
We reproduce it in a simple test as below, we use the 3.16.0 version with
openjdk 11
could you help with that?
Thank you

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
import org.apache.camel.processor.aggregate.StringAggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class CamelTester {

    private static final AtomicLong START_TIME = new AtomicLong();

    public static void main(String[] args) throws Exception {
        CamelContext camelContext = startCamel();
        FluentProducerTemplate fluentProducerTemplate =
DefaultFluentProducerTemplate
                .on(camelContext)
                .withTemplateCustomizer(
                        template -> {
                            template.setMaximumCacheSize(100);
                        }
                );

        IntStream.range(1, 2000).forEach(i -> {
            new Thread(() -> {
                fluentProducerTemplate
                        .withBody("message-" + i)
                        .to("direct:start")
                        .send();
            }).start();
        });

        Thread.sleep(24 * 60 * 60 * 1000);
    }

    private static CamelContext startCamel() throws Exception {
        log("start Camel Context...");
        CamelContext camelContext = new DefaultCamelContext();
        camelContext.addRoutes(createBasicRoute(camelContext));
        camelContext.start();
        return camelContext;
    }

    static RouteBuilder createBasicRoute(CamelContext camelContext) {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .multicast().parallelProcessing()
                            .aggregationStrategy(new
StringAggregationStrategy())
                            .timeout(10000)
                            .to("direct:a")
                            .to("direct:b")
                            .to("direct:c")
                            .to("direct:def")
                        .end()
                        .to("file:c:/tmp/outbox?charset=iso-8859-1");

                from("direct:def").routeId("def")
                        .split(body().tokenize("message-")).parallelProcessing()
                            .aggregationStrategy(new
StringAggregationStrategy())
                            .timeout(10000)
                            .to("direct:d")
                            .to("direct:e")
                            .to("direct:f")
                        .end();

                from("direct:a").routeId("a")
                        .process(new CustomProcessor("route-A"));
                from("direct:b").routeId("b")
                        .process(new CustomProcessor("route-B"));
                from("direct:c").routeId("c")
                        .process(new CustomProcessor("route-C"));
                from("direct:d").routeId("d")
                        .process(new CustomProcessor("route-D"));
                from("direct:e").routeId("e")
                        .process(new CustomProcessor("route-E"));
                from("direct:f").routeId("f")
                        .process(new CustomProcessor("route-F"));

            }
        };
    }
    public static void log(String msg) {
        long now = System.currentTimeMillis();
        START_TIME.compareAndSet(0, now);
        long elapsed = now - START_TIME.get();
        String name = Thread.currentThread().getName();
        System.out.format("%2$-4s %1$-26s    %3$s\n", name, elapsed, msg);
    }

    static private class CustomProcessor implements Processor {
        String routeId;

        public CustomProcessor(String route) {
            this.routeId = route;
        }

        @Override
        public void process(Exchange exchange) throws Exception {
            log("Start processing for the route " + routeId + " " +
exchange.getMessage().getBody(String.class));
            int waitingTime = ThreadLocalRandom.current().nextInt(1, 2);
            Thread.sleep(waitingTime * 1000);
            String message = exchange.getMessage().getBody(String.class);
            message += "-" + routeId + "-";
            exchange.getMessage().setBody(message, String.class);
            log("End processing for the route " + routeId + " after "
+ waitingTime + "s " + message);
        }
    }

    static private class CostumAggregationStrategy extends
UseLatestAggregationStrategy {
        public void timeout(Exchange oldExchange, int index, int
total, long timeout) {
            System.out.println("timeout...");
        }
    }
}

Reply via email to