Hello,
I've experienced a strange behavior in a camel route: if I removed
a logging instruction my Camel route broke.  After making some tests I
realized that the problem was in the behaviour of the aggregator and the
splitter and I'd like you to clear  some doubts about the pipeline
mechanism.
I tried it with both Camel 2.17.7 and 2.19.2.

For example if we expect 2 messages and we implement a custom aggregation
that returns an exchange with an OUT like this:

public class MyAggregationStrategyFaulty implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String oldBody = oldExchange.getIn().getBody(String.class);
        String currentBody = newExchange.getIn().getBody(String.class);
        List<String> newOut = new ArrayList<>();
        newOut.add(oldBody);
        newOut.add(currentBody);
        oldExchange.getOut().setBody(newOut);
        return oldExchange;
    }
}

and then we implement a route like the following:

public class ProvaAggregateFaulty extends CamelTestSupport {
    private final static Logger logger =
LoggerFactory.getLogger(ProvaAggregateFaulty.class);

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                from("direct:start")
                        .log("Receiving \"${body}\" with correlation key
\"${header.myId}\"")
                        .aggregate(header("myId"),
                                     new MyAggregationStrategyFaulty())
                                     .completionSize(2)
                            .log("END AGGREGATE SUBROUTE (in,
out): \"${in.body}\", \"${out.body}\"")
                            .log("END AGGREGATE SUBROUTE 2 (in,
out): \"${in.body}\", \"${out.body}\"")
                            .end()

                        .log("END ROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                        .to("mock:result");

            }
        };
    }

    @Test
    public void test() throws Exception {
        context.setTracing(true);

        template.sendBodyAndHeader("direct:start", "a", "myId", 1);
        template.sendBodyAndHeader("direct:start", "b", "myId", 1);

        Thread.sleep(1000 * 10);
        assertMockEndpointsSatisfied();
    }


}

we can see that the pipeline mechanism (OUT->IN) is not triggered
when processing the first node of the aggregator subroute (the log node
is seeing a message with both IN and OUT). Note however that if we insert
another  node after the logging, for example another logging, the pipeline
is triggered after the first log:
2017-08-22 11:22:51,588 [main           ] INFO  route1         - Receiving
"a" with correlation key "1"
2017-08-22 11:22:51,591 [main           ] INFO  route1         - END ROUTE
(in, out): "a", ""
2017-08-22 11:22:51,591 [main           ] INFO  route1         - Receiving
"b" with correlation key "1"
2017-08-22 11:22:51,595 [main           ] INFO  route1         - END
AGGREGATE SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:22:51,595 [main           ] INFO  route1         - END
AGGREGATE SUBROUTE 2 (in, out): "[a, b]", ""
2017-08-22 11:22:51,595 [main           ] INFO  route1         - END ROUTE
(in, out): "b", ""


In my specific case I had to work with a route that had a splitter inside
the aggregator, as in the following example:
                from("direct:start")
                        .log("Receiving \"${body}\" with correlation key
\"${header.myId}\"")
                        .aggregate(header("myId"),
                                new MyAggregationStrategyFaulty())
                                        .completionSize(2)
                                .split(body())
                                      .log("SPLIT SUBROUTE (in,
out): \"${in.body}\", \"${out.body}\"")
                                      .end()
                                .end()
                        .log("END ROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                        .to("mock:result");
            }

with the following results:
2017-08-22 11:41:54,526 [main           ] INFO  route1         - Receiving
"a" with correlation key "1"
2017-08-22 11:41:54,529 [main           ] INFO  route1         - END ROUTE
(in, out): "a", ""
2017-08-22 11:41:54,531 [main           ] INFO  route1         - Receiving
"b" with correlation key "1"
2017-08-22 11:41:54,540 [main           ] INFO  route1         - SPLIT
SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:41:54,540 [main           ] INFO  route1         - END ROUTE
(in, out): "b", ""


In this case:
- the message arriving to the splitter has both IN and OUT as before
(IN contains the first message, OUT contains the aggregated list) ,
because of the missing pipeline-triggering
- the splitter uses body(), but the body() method returns the IN even
if the OUT is not null. The splitting incorrectly happens on the IN
body instead of the OUT, that is a string containing the first
message, so there is only 1 iteration
    (aside note: the outBody() in Camel seems to be deprecated. What
is a correct alternative?)
- the aggregation strategy returns an exchange with an OUT as before.
I think this is not a correct way to implement an Aggregation
Strategy, in fact I didn't find any example in the documentation of an
aggregation strategy returning an exchange with an OUT

If after the logging the message is written to file the arraylist in
the OUT is shifted into the IN and we have an error:

                from("direct:start")
                        .log("Receiving \"${body}\" with correlation key
\"${header.myId}\"")
                        .aggregate(header("myId"),
                                new MyAggregationStrategyFaulty())
                                        .completionSize(2)
                                .log("AGGREGATE SUBROUTE (in,
out): \"${in.body}\", \"${out.body}\"")
                                .to("file:data?fileName=asd")
                                .end()
                        .log("END ROUTE (in, out):
\"${in.body}\", \"${out.body}\"")
                        .to("mock:result");


2017-08-22 11:54:00,408 [main           ] INFO  route1         - Receiving
"a" with correlation key "1"
2017-08-22 11:54:00,410 [main           ] INFO  route1         - END ROUTE
(in, out): "a", ""
2017-08-22 11:54:00,411 [main           ] INFO  route1         - Receiving
"b" with correlation key "1"
2017-08-22 11:54:00,415 [main           ] INFO  route1         - AGGREGATE
SUBROUTE (in, out): "a", "[a, b]"
2017-08-22 11:54:00,418 [main           ] ERROR DefaultErrorHandler
- Failed delivery for (MessageId:
ID-giuliano-Latitude-E6540-37043-1503395640049-0-7 on
ExchangeId:ID-giuliano-Latitude-E6540-37043-1503395640049-0-5). Exhausted
after
delivery attempt: 1
caught: org.apache.camel.component.file.GenericFileOperationFailedException:
Cannot
store file: data/asd
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot
store file: data/asd
...
Caused by: org.apache.camel.InvalidPayloadException: No body available of
type: java.io.InputStream but has value: [a, b] of
type: java.util.ArrayList ....
...



So in order to correct my error I changed the implementation of
the aggregation strategy returning an exchange with the IN instead of
the OUT, but I have a few questions:
- Is the missing pipeline-triggering inside the aggregator subroute a bug
or a desired feature?
- Is it correct to implement an aggregation strategy returning an exchange
with OUT ?

Thanks,
Giuliano Santandrea

Reply via email to