Hello Camel community,

We are currently facing an issue with a Camel route that uses a splitter with 
parallelProcessing. The route includes an asynchronous processor and a delay 
within the splitter. However, the problem is that the split threads do not wait 
for the asynchronous process to complete before proceeding further.

I’ve tested this in multiple Camel versions (2.24, 3.14, and 4.8), and the 
behavior differs:
Although the max. number of threads for split processing is set to 2, we have 
no control over thread usage due to the async processor. In Camel 3.14 and 
Camel 4.8, the 2 threads will forward the message to the async processor, then 
continue with the next 2 split parts, which are also handed over to another 
thread pool and so on. This can lead to hundreds of parallel threads performing 
the operations defined within the split-block. If these operations are 
long-running and memory-intensive, this can lead to further problems. In the 
older (unmaintained – 2.x) Camel version, this was not the case. Although the 
threads changed due to the async processor, the “split-related threads” were in 
a blocking/waiting state (I assume). At least there was no “unbounded” 
upscaling in the number of threads.
Here is a simplified test setup for reference:
1. The route includes:
   - split() with parallelProcessing().
   - An AsyncProcessor that simulates asynchronous work using CompletableFuture.
   - A .delay() after the asynchronous processing.
@Test
public void testParallelProcessing() throws Exception {

    getMockEndpoint("mock:split").expectedMessageCount(10);

    String xmlBody = "<employees>" +
            "<employee><id>1</id><name>John</name></employee>" +
            "<employee><id>2</id><name>Jane</name></employee>" +
            "<employee><id>3</id><name>Jim</name></employee>" +
            "<employee><id>4</id><name>Jack</name></employee>" +
            "<employee><id>5</id><name>Jill</name></employee>" +
            "<employee><id>6</id><name>opi</name></employee>" +
            "<employee><id>7</id><name>ds</name></employee>" +
            "<employee><id>8</id><name>hhh</name></employee>" +
            "<employee><id>9</id><name>fki</name></employee>" +
            "<employee><id>10</id><name>abc</name></employee>" +
            "</employees> ";

    template.sendBody("direct:start", xmlBody);

    assertMockEndpointsSatisfied();
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {

    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            ThreadPoolProfile myThreadPoolProfile = new 
ThreadPoolProfile("threadPoolProfile");
            myThreadPoolProfile.setMaxPoolSize(2);
            myThreadPoolProfile.setPoolSize(2);
            myThreadPoolProfile.setMaxQueueSize(2);
            
myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);

            
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);

            AsyncProcessor asyncProcessor = new AsyncProcessor() {
                @Override
                public void process(Exchange exchange) throws Exception {

                }

                @Override
                public boolean process(Exchange exchange, AsyncCallback 
callback) {
                    // Run the processing in a separate thread
                    CompletableFuture.runAsync(() -> {
                        try {
                            // Simulate some asynchronous processing
                            Thread.sleep(1000);
                            exchange.getIn().setBody("Processed 
asynchronously");
                        } catch (InterruptedException e) {
                            exchange.setException(e);
                        } finally {
                            // Signal completion
                            callback.done(false);
                        }
                    });

                    // Return false to indicate that processing is not complete 
yet
                    return false;
                }

                @Override
                public CompletableFuture<Exchange> processAsync(Exchange 
exchange) {
                    return null;
                }
            };
            from("direct:start")
                    .split()
                    .xpath("/employees/employee")
                    .parallelProcessing()
                    .stopOnException()
                    .timeout("300000")
                    .executorService("threadPoolProfile")
                    .process(e -> System.out.println("Before AsyncProcessor - 
Thread: " + Thread.currentThread().getName()))
                    .process(asyncProcessor)
                    .process(e -> System.out.println("After AsyncProcessor - 
Thread: " + Thread.currentThread().getName()))
                    .delay(5000)
                    .end()
                    .to("mock:split");

        }
    };
}

Result in Camel 2.24:
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1


Result in 3.14 and 4.8:
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-6
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-3
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-5
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-10
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-8
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-9
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-7
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-4

I’ve adjusted the thread pool size and configurations but couldn’t resolve the 
issue. I suspect this may be related to changes in how Camel handles 
asynchronous processing and threading in versions beyond 2.24.

Could you please confirm if this is a known issue or if there are specific 
configurations needed in newer Camel versions to achieve the expected behavior? 
Any guidance or suggestions would be highly appreciated.

Thank you for your support, and I look forward to hearing from you.

Best regards,

Best Regards,

Soheila Esmaeili
Pronouns: She
Senior Developer, TI SAP CP INT CPI (SE)
SAP SE Dietmar-Hopp-Allee 16, 69190 Walldorf, Germany
E: .soheila.esmae...@sap.com<mailto:.soheila.esmae...@sap.com>
T: +49 6227 7-46596<tel:%20%20+49%206227%207-67287>



[cid:image001.png@01DB6DA1.89944740]

Pflichtangaben/Mandatory Disclosure Statement: www.sap.com/impressum
Diese E-Mail kann Betriebs- oder Geschäftsgeheimnisse oder sonstige 
vertrauliche Informationen enthalten. Sollten Sie diese E-Mail irrtümlich 
erhalten haben, ist Ihnen eine Kenntnisnahme des Inhalts, eine Vervielfältigung 
oder Weitergabe der E-Mail ausdrücklich untersagt. Bitte benachrichtigen Sie 
uns und vernichten Sie die empfangene E-Mail. Vielen Dank.
This e-mail may contain trade secrets or privileged, undisclosed, or otherwise 
confidential information. If you have received this e-mail in error, you are 
hereby notified that any review, copying, or distribution of it is strictly 
prohibited. Please inform us immediately and destroy the original transmittal. 
Thank you for your cooperation.


Reply via email to