Hello Camel community, We are currently facing an issue with a Camel route that uses a splitter with parallelProcessing. The route includes an asynchronous processor and a delay within the splitter. However, the problem is that the split threads do not wait for the asynchronous process to complete before proceeding further.
I’ve tested this in multiple Camel versions (2.24, 3.14, and 4.8), and the behavior differs: Although the max. number of threads for split processing is set to 2, we have no control over thread usage due to the async processor. In Camel 3.14 and Camel 4.8, the 2 threads will forward the message to the async processor, then continue with the next 2 split parts, which are also handed over to another thread pool and so on. This can lead to hundreds of parallel threads performing the operations defined within the split-block. If these operations are long-running and memory-intensive, this can lead to further problems. In the older (unmaintained – 2.x) Camel version, this was not the case. Although the threads changed due to the async processor, the “split-related threads” were in a blocking/waiting state (I assume). At least there was no “unbounded” upscaling in the number of threads. Here is a simplified test setup for reference: 1. The route includes: - split() with parallelProcessing(). - An AsyncProcessor that simulates asynchronous work using CompletableFuture. - A .delay() after the asynchronous processing. @Test public void testParallelProcessing() throws Exception { getMockEndpoint("mock:split").expectedMessageCount(10); String xmlBody = "<employees>" + "<employee><id>1</id><name>John</name></employee>" + "<employee><id>2</id><name>Jane</name></employee>" + "<employee><id>3</id><name>Jim</name></employee>" + "<employee><id>4</id><name>Jack</name></employee>" + "<employee><id>5</id><name>Jill</name></employee>" + "<employee><id>6</id><name>opi</name></employee>" + "<employee><id>7</id><name>ds</name></employee>" + "<employee><id>8</id><name>hhh</name></employee>" + "<employee><id>9</id><name>fki</name></employee>" + "<employee><id>10</id><name>abc</name></employee>" + "</employees> "; template.sendBody("direct:start", xmlBody); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("threadPoolProfile"); myThreadPoolProfile.setMaxPoolSize(2); myThreadPoolProfile.setPoolSize(2); myThreadPoolProfile.setMaxQueueSize(2); myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile); AsyncProcessor asyncProcessor = new AsyncProcessor() { @Override public void process(Exchange exchange) throws Exception { } @Override public boolean process(Exchange exchange, AsyncCallback callback) { // Run the processing in a separate thread CompletableFuture.runAsync(() -> { try { // Simulate some asynchronous processing Thread.sleep(1000); exchange.getIn().setBody("Processed asynchronously"); } catch (InterruptedException e) { exchange.setException(e); } finally { // Signal completion callback.done(false); } }); // Return false to indicate that processing is not complete yet return false; } @Override public CompletableFuture<Exchange> processAsync(Exchange exchange) { return null; } }; from("direct:start") .split() .xpath("/employees/employee") .parallelProcessing() .stopOnException() .timeout("300000") .executorService("threadPoolProfile") .process(e -> System.out.println("Before AsyncProcessor - Thread: " + Thread.currentThread().getName())) .process(asyncProcessor) .process(e -> System.out.println("After AsyncProcessor - Thread: " + Thread.currentThread().getName())) .delay(5000) .end() .to("mock:split"); } }; } Result in Camel 2.24: Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 Result in 3.14 and 4.8: Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-6 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-3 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-5 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-10 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-8 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-9 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-7 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1 After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-4 I’ve adjusted the thread pool size and configurations but couldn’t resolve the issue. I suspect this may be related to changes in how Camel handles asynchronous processing and threading in versions beyond 2.24. Could you please confirm if this is a known issue or if there are specific configurations needed in newer Camel versions to achieve the expected behavior? Any guidance or suggestions would be highly appreciated. Thank you for your support, and I look forward to hearing from you. Best regards, Best Regards, Soheila Esmaeili Pronouns: She Senior Developer, TI SAP CP INT CPI (SE) SAP SE Dietmar-Hopp-Allee 16, 69190 Walldorf, Germany E: .soheila.esmae...@sap.com<mailto:.soheila.esmae...@sap.com> T: +49 6227 7-46596<tel:%20%20+49%206227%207-67287> [cid:image001.png@01DB6DA1.89944740] Pflichtangaben/Mandatory Disclosure Statement: www.sap.com/impressum Diese E-Mail kann Betriebs- oder Geschäftsgeheimnisse oder sonstige vertrauliche Informationen enthalten. Sollten Sie diese E-Mail irrtümlich erhalten haben, ist Ihnen eine Kenntnisnahme des Inhalts, eine Vervielfältigung oder Weitergabe der E-Mail ausdrücklich untersagt. Bitte benachrichtigen Sie uns und vernichten Sie die empfangene E-Mail. Vielen Dank. This e-mail may contain trade secrets or privileged, undisclosed, or otherwise confidential information. If you have received this e-mail in error, you are hereby notified that any review, copying, or distribution of it is strictly prohibited. Please inform us immediately and destroy the original transmittal. Thank you for your cooperation.