[ https://issues.apache.org/jira/browse/CAMEL-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated CAMEL-20214: -------------------------------- Fix Version/s: 3.22.0 > camel-core - Timeout tasks of parallel splitter block further message > processing > -------------------------------------------------------------------------------- > > Key: CAMEL-20214 > URL: https://issues.apache.org/jira/browse/CAMEL-20214 > Project: Camel > Issue Type: Bug > Components: came-core > Affects Versions: 3.14.10, 3.21.2 > Reporter: Andre Weickel > Priority: Minor > Fix For: 3.22.0, 4.3.0 > > > After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue > in all newer Camel 3 versions: > A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize = > 1000. > If the route is called 1001 times within the configured splitter timeout one > message failes with "java.util.concurrent.RejectedExecutionException: Task > rejected due queue size limit reached" which is thrown by the > SizedScheduledExecutorService class. > For each call of the route one "timeout" task is added to the > DelayedWorkQueue which is used by the "Splitter- AggregateTask" thread. Each > of this "timeout" tasks wait until its timeout is reached although the > message processing is already completed. That means the 1000 messages are > already processed sucessfully but the 1000 "timeout" tasks are still in the > DelayedWorkQueue and block further message processing (until the timeout is > reached) because the queue is full. > We found some comments in the MulticastProcessor.doStart() method that an > unbounded thread pool has to be used for the "Splitter-AggregateTask" thread > to avoid issues. > {code:java} > /* use unbounded thread pool so we ensure the aggregate on-the-fly task > always will have assigned a thread and run the tasks when the task is > submitted. If not then the aggregate task may not be able to run and signal > completion during processing, which would lead to what would appear as a > dead-lock or a slow processing */{code} > Therefore we assume the maxQueueSize should also be unlimited when the thread > pool is created in MulticastProcessor.createAggregateExcutorService(). > A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to > reproduce the issue with only two calls): > > {code:java} > public class SplitterTest extends CamelTestSupport { String payload1 = > "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>"; > String payload2 = > "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>"; > > @Test > public void testSplitter() throws InterruptedException, IOException { > MockEndpoint mockEndpoint = getMockEndpoint("mock:split"); > mockEndpoint.expectedMessageCount(4); > template.sendBody("direct:start", payload1); > template.sendBody("direct:start", payload2); > assertMockEndpointsSatisfied(); > } @Override > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > ThreadPoolProfile myThreadPoolProfile = new > ThreadPoolProfile("testProfile"); > myThreadPoolProfile.setMaxPoolSize(20); > myThreadPoolProfile.setPoolSize(10); > myThreadPoolProfile.setMaxQueueSize(1); > > getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile); > from("direct:start") > .split() > .xpath("//items/item") > .parallelProcessing(true) > .streaming(true) > .stopOnException(true) > .timeout("300000") > .executorServiceRef("testProfile") > .to("mock:split"); > } > }; > } > > }{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)