[ 
https://issues.apache.org/jira/browse/CAMEL-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen resolved CAMEL-20214.
---------------------------------
    Resolution: Fixed

> camel-core - Timeout tasks of parallel splitter block further message 
> processing
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-20214
>                 URL: https://issues.apache.org/jira/browse/CAMEL-20214
>             Project: Camel
>          Issue Type: Bug
>          Components: came-core
>    Affects Versions: 3.14.10, 3.21.2
>            Reporter: Andre Weickel
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 3.21.4, 3.22.0, 4.0.4, 4.3.0
>
>
> After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue 
> in all newer Camel 3 versions:
> A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize = 
> 1000.
> If the route is called 1001 times within the configured splitter timeout one 
> message failes with "java.util.concurrent.RejectedExecutionException: Task 
> rejected due queue size limit reached" which is thrown by the 
> SizedScheduledExecutorService class.
> For each call of the route one "timeout" task is added to the 
> DelayedWorkQueue which is used by the "Splitter- AggregateTask" thread. Each 
> of this "timeout" tasks wait until its timeout is reached although the 
> message processing is already completed. That means the 1000 messages are 
> already processed sucessfully but the 1000 "timeout" tasks are still in the 
> DelayedWorkQueue and block further message processing (until the timeout is 
> reached) because the queue is full.
> We found some comments in the MulticastProcessor.doStart() method that an 
> unbounded thread pool has to be used for the "Splitter-AggregateTask" thread 
> to avoid issues.
> {code:java}
> /* use unbounded thread pool so we ensure the aggregate on-the-fly task 
> always will have assigned a thread and run the tasks when the task is 
> submitted. If not then the aggregate task may not be able to run and signal 
> completion during processing, which would lead to what would appear as a 
> dead-lock or a slow processing */{code}
> Therefore we assume the maxQueueSize should also be unlimited when the thread 
> pool is created in MulticastProcessor.createAggregateExcutorService().
> A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to 
> reproduce the issue with only two calls):
>  
> {code:java}
> public class SplitterTest extends CamelTestSupport {    String payload1 = 
> "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
>     String payload2 = 
> "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
>     
>     @Test
>     public void testSplitter() throws InterruptedException, IOException {
>         MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
>         mockEndpoint.expectedMessageCount(4);        
> template.sendBody("direct:start", payload1);
>         template.sendBody("direct:start", payload2);        
> assertMockEndpointsSatisfied();
>     }    @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 ThreadPoolProfile myThreadPoolProfile = new 
> ThreadPoolProfile("testProfile");
>                 myThreadPoolProfile.setMaxPoolSize(20);
>                 myThreadPoolProfile.setPoolSize(10);
>                 myThreadPoolProfile.setMaxQueueSize(1);
>                 
> getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
>                 from("direct:start")
>                         .split()
>                             .xpath("//items/item")
>                             .parallelProcessing(true)
>                             .streaming(true)
>                             .stopOnException(true)
>                             .timeout("300000")
>                             .executorServiceRef("testProfile")
>                             .to("mock:split");
>             }
>         };
>     }
>     
> }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to