Hello Camel community,
We are currently facing an issue with a Camel route that uses a splitter with
parallelProcessing. The route includes an asynchronous processor and a delay
within the splitter. However, the problem is that the split threads do not wait
for the asynchronous process to complete before proceeding further.
I’ve tested this in multiple Camel versions (2.24, 3.14, and 4.8), and the
behavior differs:
Although the max. number of threads for split processing is set to 2, we have
no control over thread usage due to the async processor. In Camel 3.14 and
Camel 4.8, the 2 threads will forward the message to the async processor, then
continue with the next 2 split parts, which are also handed over to another
thread pool and so on. This can lead to hundreds of parallel threads performing
the operations defined within the split-block. If these operations are
long-running and memory-intensive, this can lead to further problems. In the
older (unmaintained – 2.x) Camel version, this was not the case. Although the
threads changed due to the async processor, the “split-related threads” were in
a blocking/waiting state (I assume). At least there was no “unbounded”
upscaling in the number of threads.
Here is a simplified test setup for reference:
1. The route includes:
- split() with parallelProcessing().
- An AsyncProcessor that simulates asynchronous work using CompletableFuture.
- A .delay() after the asynchronous processing.
@Test
public void testParallelProcessing() throws Exception {
getMockEndpoint("mock:split").expectedMessageCount(10);
String xmlBody = "<employees>" +
"<employee><id>1</id><name>John</name></employee>" +
"<employee><id>2</id><name>Jane</name></employee>" +
"<employee><id>3</id><name>Jim</name></employee>" +
"<employee><id>4</id><name>Jack</name></employee>" +
"<employee><id>5</id><name>Jill</name></employee>" +
"<employee><id>6</id><name>opi</name></employee>" +
"<employee><id>7</id><name>ds</name></employee>" +
"<employee><id>8</id><name>hhh</name></employee>" +
"<employee><id>9</id><name>fki</name></employee>" +
"<employee><id>10</id><name>abc</name></employee>" +
"</employees> ";
template.sendBody("direct:start", xmlBody);
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
ThreadPoolProfile myThreadPoolProfile = new
ThreadPoolProfile("threadPoolProfile");
myThreadPoolProfile.setMaxPoolSize(2);
myThreadPoolProfile.setPoolSize(2);
myThreadPoolProfile.setMaxQueueSize(2);
myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
AsyncProcessor asyncProcessor = new AsyncProcessor() {
@Override
public void process(Exchange exchange) throws Exception {
}
@Override
public boolean process(Exchange exchange, AsyncCallback
callback) {
// Run the processing in a separate thread
CompletableFuture.runAsync(() -> {
try {
// Simulate some asynchronous processing
Thread.sleep(1000);
exchange.getIn().setBody("Processed
asynchronously");
} catch (InterruptedException e) {
exchange.setException(e);
} finally {
// Signal completion
callback.done(false);
}
});
// Return false to indicate that processing is not complete
yet
return false;
}
@Override
public CompletableFuture<Exchange> processAsync(Exchange
exchange) {
return null;
}
};
from("direct:start")
.split()
.xpath("/employees/employee")
.parallelProcessing()
.stopOnException()
.timeout("300000")
.executorService("threadPoolProfile")
.process(e -> System.out.println("Before AsyncProcessor -
Thread: " + Thread.currentThread().getName()))
.process(asyncProcessor)
.process(e -> System.out.println("After AsyncProcessor -
Thread: " + Thread.currentThread().getName()))
.delay(5000)
.end()
.to("mock:split");
}
};
}
Result in Camel 2.24:
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
Before AsyncProcessor - Thread: Camel (camel-1) thread #1 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
Result in 3.14 and 4.8:
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #2 - Split
Before AsyncProcessor - Thread: Camel (camel-1) thread #3 - Split
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-6
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-3
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-5
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-10
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-8
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-2
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-9
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-7
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-1
After AsyncProcessor - Thread: ForkJoinPool.commonPool-worker-4
I’ve adjusted the thread pool size and configurations but couldn’t resolve the
issue. I suspect this may be related to changes in how Camel handles
asynchronous processing and threading in versions beyond 2.24.
Could you please confirm if this is a known issue or if there are specific
configurations needed in newer Camel versions to achieve the expected behavior?
Any guidance or suggestions would be highly appreciated.
Thank you for your support, and I look forward to hearing from you.
Best regards,
Best Regards,
Soheila Esmaeili
Pronouns: She
Senior Developer, TI SAP CP INT CPI (SE)
SAP SE Dietmar-Hopp-Allee 16, 69190 Walldorf, Germany
E: [email protected]<mailto:[email protected]>
T: +49 6227 7-46596<tel:%20%20+49%206227%207-67287>
[cid:[email protected]]
Pflichtangaben/Mandatory Disclosure Statement: www.sap.com/impressum
Diese E-Mail kann Betriebs- oder Geschäftsgeheimnisse oder sonstige
vertrauliche Informationen enthalten. Sollten Sie diese E-Mail irrtümlich
erhalten haben, ist Ihnen eine Kenntnisnahme des Inhalts, eine Vervielfältigung
oder Weitergabe der E-Mail ausdrücklich untersagt. Bitte benachrichtigen Sie
uns und vernichten Sie die empfangene E-Mail. Vielen Dank.
This e-mail may contain trade secrets or privileged, undisclosed, or otherwise
confidential information. If you have received this e-mail in error, you are
hereby notified that any review, copying, or distribution of it is strictly
prohibited. Please inform us immediately and destroy the original transmittal.
Thank you for your cooperation.