Hi All,
I’m observing an intermittent UnsupportedOperationException in Splitter EIP
with parallel processing when the pooled exchange is enabled. The scenario
is to flatten a nested list using a recursive route. The route has
streaming and parallel processing enabled.
The following exception is raised ~7 out of 10 runs
11:32:17.179 [Camel (camel-1) thread #2 - Split] ERROR
org.apache.camel.processor.errorhandler.DefaultErrorHandler -- Failed
delivery for (MessageId: 6B26CA6266F8B1A-000000000000016E on ExchangeId:
6B26CA6266F8B1A-000000000000016E). Exhausted after delivery attempt: 1
caught: java.lang.UnsupportedOperationException: Is this really correct ?
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID
Processor Elapsed (ms)
route1/route1
from[direct://processList] 8
...
route1/split1
split[simple{${body}}] 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.UnsupportedOperationException: Is this really correct ?
at
org.apache.camel.processor.MulticastProcessor.wrapInErrorHandler(MulticastProcessor.java:1063)
at
org.apache.camel.processor.MulticastProcessor.createProcessorExchangePair(MulticastProcessor.java:1045)
at
org.apache.camel.processor.Splitter$SplitterIterable$1.next(Splitter.java:243)
at
org.apache.camel.processor.Splitter$SplitterIterable$1.next(Splitter.java:184)
at
org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.getNextProcessorExchangePair(MulticastProcessor.java:640)
at
org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.run(MulticastProcessor.java:557)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:64)
at
org.apache.camel.processor.MulticastProcessor.lambda$doProcess$0(MulticastProcessor.java:362)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Test Code
public class FlattenListTest extends CamelTestSupport {
private static final int L1 = 20;
private static final int L2 = 5;
private static final int L3 = 5;
@Override
protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("direct:processList")
.choice()
.when(exchange -> (exchange.getIn().getBody() instanceof
List))
.split(body()).streaming().parallelProcessing(true)
.to("direct:processList")
.endChoice()
.otherwise()
.log("${body}")
.to("mock:result")
.end();
}
};
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ExtendedCamelContext ecc = camelContext.getCamelContextExtension();
ecc.setExchangeFactory(new PooledExchangeFactory());
ecc.setProcessorExchangeFactory(new
PooledProcessorExchangeFactory());
ecc.getExchangeFactory().setStatisticsEnabled(true);
ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
return camelContext;
}
@Test
public void testSplitter() throws Exception {
List<List<List<Integer>>> data = new ArrayList<>();
int num = 0;
for (int i = 0; i < L1; i++) { // Outer level
List<List<Integer>> level2 = new ArrayList<>();
for (int j = 0; j < L2; j++) { // Middle level
List<Integer> level3 = new ArrayList<>();
for (int k = 0; k < L3; k++) { // Inner level
level3.add(num++);
}
level2.add(level3);
}
data.add(level2);
}
getMockEndpoint("mock:result").expectedMessageCount(num);
template.sendBody("direct:processList", data);
MockEndpoint.assertIsSatisfied(context);
}
}
The route should flatten the nested list and deliver all individual
elements to the final endpoint without exceptions or data loss. Please let
me know if this is a known limitation or if I’m misusing pooled exchanges
in this scenario.
Camel Version: 4.18.0
Java Version: 17