Hi, I now design three route to process my input message and want to
concatenate them using my aggregation strategy.Here is my design:
payload ->route1:"direct:start", do multicast to route 2("direct:a") and
route 3("direct:b")
in route 2("direct:a"), do multicast to 8 servers, and use
JoinReplyAggregationStrategy to concatenate the result, lets call the result
as route2result and sent this result to route 3("direct:b")
in route 3("direct:b"), now have two inputs, the original payload from route
1 and the result from route2.
I also want to use the same JoinReplyAggregationStrategy to concatenate
these two parts, but it give me an exception and the output is only the
original payload from route1("direct:start").
Here is the exception:
Caused by: java.lang.NullPointerException: null
        at
mengt.camel.JoinReplyAggregationStrategy.aggregate(JoinReplyAggregationStrategy.java:19)
~[main/:na]
        at
org.apache.camel.processor.aggregate.AggregateProcessor.onAggregation(AggregateProcessor.java:629)
~[camel-core-2.16.1.jar:2.16.1]
        at
org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:447)
~[camel-core-2.16.1.jar:2.16.1]
        ... 29 common frames omitted

Here is the code of how to implement my route and aggregation Strategy:
public void configure() throws Exception{
        from("direct:start")
                .marshal()
                .string(UTF_8)
                .multicast()
                .parallelProcessing()
                .to("direct:route1")
                .to("direct:route2")
        ;

        from("direct:route1")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new JoinReplyAggregationStrategy())
                .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
TARGET7, TARGET8)
                //.to("log:?level=INFO&showBody=true")
                .timeout(100000)
                .end()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        String input =
exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
                        exchange.getOut().setBody(input);
                        System.out.println(input);
                    }
                })
                .to("stream:out")
               // .to("direct:b")
                .to("direct:route2");

       from("direct:route2")
              .aggregate(constant(true), new JoinReplyAggregationStrategy())
               .completionTimeout(500L)
              .to("stream:out")

public class JoinReplyAggregationStrategy implements AggregationStrategy{

    public Exchange aggregate(Exchange exchangeOld, Exchange exchangeNew){
        //String body1 = null;
        //static Logger logger = Logger.getLogger(exchangeOld.getIn());
        if(exchangeOld == null){
            return exchangeNew;
        }else {
            String body1 = exchangeOld.getIn().getBody(String.class);
            String body2 = exchangeNew.getIn().getBody(String.class);
            String status = "Http Status " +
exchangeOld.getIn().getHeader("CamelHttpResponseCode").toString();
            System.out.println(status);
            String merged = (body1 == null) ? body2 : body1 + "+" +body2;
            exchangeOld.getIn().setBody(merged);
            //System.out.println(merged);
            return exchangeOld;
        }
    }
}

Can I use the aggregation Strategy to concatenate the input from other two
routes directly like this? Or my understanding of aggregation is wrong?
Thanks,

Meng



--
View this message in context: 
http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to