Andre Weickel created CAMEL-20214:
-------------------------------------

             Summary: "Timeout" tasks of parallel splitter block further 
message processing
                 Key: CAMEL-20214
                 URL: https://issues.apache.org/jira/browse/CAMEL-20214
             Project: Camel
          Issue Type: Bug
          Components: came-core
    Affects Versions: 3.21.2, 3.14.10
            Reporter: Andre Weickel


After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue 
in all newer Camel 3 versions:

A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize = 
1000.
If the route is called 1001 times within the configured splitter timeout one 
message failes with "java.util.concurrent.RejectedExecutionException: Task 
rejected due queue size limit reached" which is thrown by the 
SizedScheduledExecutorService class.

For each call of the route one "timeout" task is added to the DelayedWorkQueue 
which is used by the "Splitter- AggregateTask" thread. Each of this "timeout" 
tasks wait until its timeout is reached although the message processing is 
already completed. That means the 1000 messages are already processed 
sucessfully but the 1000 "timeout" tasks are still in the DelayedWorkQueue and 
block further message processing because the queue is full.

We found some comments in the MulticastProcessor.doStart() method that an 
unbounded thread pool has to be used for the "Splitter-AggregateTask" thread to 
avoid issues.
{code:java}
/* use unbounded thread pool so we ensure the aggregate on-the-fly task always 
will have assigned a thread and run the tasks when the task is submitted. If 
not then the aggregate task may not be able to run and signal completion during 
processing, which would lead to what would appear as a dead-lock or a slow 
processing */{code}
Therefore we assume the maxQueueSize should also be unlimited when the thread 
pool is created in MulticastProcessor.createAggregateExcutorService().

A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to 
reproduce the issue with only two calls):

 
{code:java}
public class SplitterTest extends CamelTestSupport {    String payload1 = 
"<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
    String payload2 = 
"<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
    
    @Test
    public void testSplitter() throws InterruptedException, IOException {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
        mockEndpoint.expectedMessageCount(4);        
template.sendBody("direct:start", payload1);
        template.sendBody("direct:start", payload2);        
assertMockEndpointsSatisfied();
    }    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                ThreadPoolProfile myThreadPoolProfile = new 
ThreadPoolProfile("testProfile");
                myThreadPoolProfile.setMaxPoolSize(20);
                myThreadPoolProfile.setPoolSize(10);
                myThreadPoolProfile.setMaxQueueSize(1);
                
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
                from("direct:start")
                        .split()
                            .xpath("//items/item")
                            .parallelProcessing(true)
                            .streaming(true)
                            .stopOnException(true)
                            .timeout("300000")
                            .executorServiceRef("testProfile")
                            .to("mock:split");
            }
        };
    }
    
}{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to