Hello, I've experienced a strange behavior in a camel route: if I removed a logging instruction my Camel route broke. After making some tests I realized that the problem was in the behaviour of the aggregator and the splitter and I'd like you to clear some doubts about the pipeline mechanism. I tried it with both Camel 2.17.7 and 2.19.2.
For example if we expect 2 messages and we implement a custom aggregation that returns an exchange with an OUT like this: public class MyAggregationStrategyFaulty implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String currentBody = newExchange.getIn().getBody(String.class); List<String> newOut = new ArrayList<>(); newOut.add(oldBody); newOut.add(currentBody); oldExchange.getOut().setBody(newOut); return oldExchange; } } and then we implement a route like the following: public class ProvaAggregateFaulty extends CamelTestSupport { private final static Logger logger = LoggerFactory.getLogger(ProvaAggregateFaulty.class); @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .log("Receiving \"${body}\" with correlation key \"${header.myId}\"") .aggregate(header("myId"), new MyAggregationStrategyFaulty()) .completionSize(2) .log("END AGGREGATE SUBROUTE (in, out): \"${in.body}\", \"${out.body}\"") .log("END AGGREGATE SUBROUTE 2 (in, out): \"${in.body}\", \"${out.body}\"") .end() .log("END ROUTE (in, out): \"${in.body}\", \"${out.body}\"") .to("mock:result"); } }; } @Test public void test() throws Exception { context.setTracing(true); template.sendBodyAndHeader("direct:start", "a", "myId", 1); template.sendBodyAndHeader("direct:start", "b", "myId", 1); Thread.sleep(1000 * 10); assertMockEndpointsSatisfied(); } } we can see that the pipeline mechanism (OUT->IN) is not triggered when processing the first node of the aggregator subroute (the log node is seeing a message with both IN and OUT). Note however that if we insert another node after the logging, for example another logging, the pipeline is triggered after the first log : 2017-08-22 11:22:51,588 [main ] INFO route1 - Receiving "a" with correlation key "1" 2017-08-22 11:22:51,591 [main ] INFO route1 - END ROUTE (in, out): "a", "" 2017-08-22 11:22:51,591 [main ] INFO route1 - Receiving "b" with correlation key "1" 2017-08-22 11:22:51,595 [main ] INFO route1 - END AGGREGATE SUBROUTE (in, out): "a", "[a, b]" 2017-08-22 11:22:51,595 [main ] INFO route1 - END AGGREGATE SUBROUTE 2 (in, out): "[a, b]", "" 2017-08-22 11:22:51,595 [main ] INFO route1 - END ROUTE (in, out): "b", "" In my specific case I had to work with a route that had a splitter inside the aggregator, as in the following example: from("direct:start") .log("Receiving \"${body}\" with correlation key \"${header.myId}\"") .aggregate(header("myId"), new MyAggregationStrategyFaulty()) .completionSize(2) .split(body()) .log("SPLIT SUBROUTE (in, out): \"${in.body}\", \"${out.body}\"") .end() .end() .log("END ROUTE (in, out): \"${in.body}\", \"${out.body}\"") .to("mock:result"); } with the following results: 2017-08-22 11:41:54,526 [main ] INFO route1 - Receiving "a" with correlation key "1" 2017-08-22 11:41:54,529 [main ] INFO route1 - END ROUTE (in, out): "a", "" 2017-08-22 11:41:54,531 [main ] INFO route1 - Receiving "b" with correlation key "1" 2017-08-22 11:41:54,540 [main ] INFO route1 - SPLIT SUBROUTE (in, out): "a", "[a, b]" 2017-08-22 11:41:54,540 [main ] INFO route1 - END ROUTE (in, out): "b", "" In this case: - the message arriving to the splitter has both IN and OUT as before (IN contains the first message, OUT contains the aggregated list) , because of the missing pipeline-triggering - the splitter uses body(), but the body() method returns the IN even if the OUT is not null. The splitting incorrectly happens on the IN body instead of the OUT, that is a string containing the first message, so there is only 1 iteration (aside note: the outBody() in Camel seems to be deprecated. What is a correct alternative?) - the aggregation strategy returns an exchange with an OUT as before. I think this is not a correct way to implement an Aggregation Strategy, in fact I didn't find any example in the documentation of an aggregation strategy returning an exchange with an OUT If after the logging the message is written to file the arraylist in the OUT is shifted into the IN and we have an error: from("direct:start") .log("Receiving \"${body}\" with correlation key \"${header.myId}\"") .aggregate(header("myId"), new MyAggregationStrategyFaulty()) .completionSize(2) .log("AGGREGATE SUBROUTE (in, out): \"${in.body}\", \"${out.body}\"") .to("file:data?fileName=asd") .end() .log("END ROUTE (in, out): \"${in.body}\", \"${out.body}\"") .to("mock:result"); 2017-08-22 11:54:00,408 [main ] INFO route1 - Receiving "a" with correlation key "1" 2017-08-22 11:54:00,410 [main ] INFO route1 - END ROUTE (in, out): "a", "" 2017-08-22 11:54:00,411 [main ] INFO route1 - Receiving "b" with correlation key "1" 2017-08-22 11:54:00,415 [main ] INFO route1 - AGGREGATE SUBROUTE (in, out): "a", "[a, b]" 2017-08-22 11:54:00,418 [main ] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID-giuliano-Latitude-E6540-37043-1503395640049-0-7 on ExchangeId: ID-giuliano-Latitude-E6540-37043-1503395640049-0-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: data/asd org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: data/asd ... Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: [a, b] of type: java.util.ArrayList .... ... So in order to correct my error I changed the implementation of the aggregation strategy returning an exchange with the IN instead of the OUT, but I have a few questions: - Is the missing pipeline-triggering inside the aggregator subroute a bug or a desired feature? - Is it correct to implement an aggregation strategy returning an exchange with OUT ? Thanks, Giuliano Santandrea