Hello,
We notice under some circumstances of heavy load and request (more the 1000
requests at the same time), threads used by Camel are still in a waiting
state and block the caller thread. This behavior occurs when using
multicast and split, but only when we have configured a timeout on our
routes.
We reproduce it in a simple test as below, we use the 3.16.0 version with
openjdk 11
could you help with that?
Thank you
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
import org.apache.camel.processor.aggregate.StringAggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class CamelTester {
private static final AtomicLong START_TIME = new AtomicLong();
public static void main(String[] args) throws Exception {
CamelContext camelContext = startCamel();
FluentProducerTemplate fluentProducerTemplate =
DefaultFluentProducerTemplate
.on(camelContext)
.withTemplateCustomizer(
template -> {
template.setMaximumCacheSize(100);
}
);
IntStream.range(1, 2000).forEach(i -> {
new Thread(() -> {
fluentProducerTemplate
.withBody("message-" + i)
.to("direct:start")
.send();
}).start();
});
Thread.sleep(24 * 60 * 60 * 1000);
}
private static CamelContext startCamel() throws Exception {
log("start Camel Context...");
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(createBasicRoute(camelContext));
camelContext.start();
return camelContext;
}
static RouteBuilder createBasicRoute(CamelContext camelContext) {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.multicast().parallelProcessing()
.aggregationStrategy(new
StringAggregationStrategy())
.timeout(10000)
.to("direct:a")
.to("direct:b")
.to("direct:c")
.to("direct:def")
.end()
.to("file:c:/tmp/outbox?charset=iso-8859-1");
from("direct:def").routeId("def")
.split(body().tokenize("message-")).parallelProcessing()
.aggregationStrategy(new
StringAggregationStrategy())
.timeout(10000)
.to("direct:d")
.to("direct:e")
.to("direct:f")
.end();
from("direct:a").routeId("a")
.process(new CustomProcessor("route-A"));
from("direct:b").routeId("b")
.process(new CustomProcessor("route-B"));
from("direct:c").routeId("c")
.process(new CustomProcessor("route-C"));
from("direct:d").routeId("d")
.process(new CustomProcessor("route-D"));
from("direct:e").routeId("e")
.process(new CustomProcessor("route-E"));
from("direct:f").routeId("f")
.process(new CustomProcessor("route-F"));
}
};
}
public static void log(String msg) {
long now = System.currentTimeMillis();
START_TIME.compareAndSet(0, now);
long elapsed = now - START_TIME.get();
String name = Thread.currentThread().getName();
System.out.format("%2$-4s %1$-26s %3$s\n", name, elapsed, msg);
}
static private class CustomProcessor implements Processor {
String routeId;
public CustomProcessor(String route) {
this.routeId = route;
}
@Override
public void process(Exchange exchange) throws Exception {
log("Start processing for the route " + routeId + " " +
exchange.getMessage().getBody(String.class));
int waitingTime = ThreadLocalRandom.current().nextInt(1, 2);
Thread.sleep(waitingTime * 1000);
String message = exchange.getMessage().getBody(String.class);
message += "-" + routeId + "-";
exchange.getMessage().setBody(message, String.class);
log("End processing for the route " + routeId + " after "
+ waitingTime + "s " + message);
}
}
static private class CostumAggregationStrategy extends
UseLatestAggregationStrategy {
public void timeout(Exchange oldExchange, int index, int
total, long timeout) {
System.out.println("timeout...");
}
}
}