Try this:

In your first processor, you are setting the "out" body, which will strip
off any other exchange values like headers.  Change this to
getIn().setBody(input);

Next, change your first route to the following:

        from("direct:start")
                .setHeader("myCorrelationId", simple("${random(1000)}",
Integer.class))
                .multicast()
                .to("direct:route1", "direct:route2")
                .end()

Then, where you want to aggregate, change the route to:

           from("direct:route2")
                .aggregate(header("myCorrelationId", new
JoinReplyAggregationStrategy()).
completionPredicate(header("aggregated").isEqualTo(2)))
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("################");
System.out.println(exchange.getIn().getBody(String.class));
                    }
                });

I didn't compile this, so make sure that there are the correct number of
parentheses, etc.

On Sat, Oct 29, 2016 at 7:59 AM, Steve973 <steve...@gmail.com> wrote:

> Yes. You need to set a header on message 1 and message 2 and use it as the
> aggregation id.  The value of this header should be the same value for both
> messages of the pair, but different pairs should have unique ids.
>
> On Oct 29, 2016 4:07 AM, "meng" <tianmeng...@gmail.com> wrote:
>
>> Hi Steve,
>>
>> Thanks for replying.
>> I made some changes, here is the code:
>>
>>         from("direct:start")
>>                 .multicast()
>>                 .to("direct:route1", "direct:route2")
>>                 .end()
>>                 ;
>>
>>         from("direct:route1")
>>                 .multicast()
>>                 .parallelProcessing()
>>                 .aggregationStrategy(new JoinReplyAggregationStrategy())
>>                 .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
>> TARGET7, TARGET8)
>>
>>                 .end()
>>                 .process(new Processor() {
>>                     @Override
>>                     public void process(Exchange exchange) throws
>> Exception
>> {
>>                         String input =
>> exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
>>                         exchange.getOut().setBody(input);
>>                         System.out.println("*****************"+input);
>>                     }
>>                 })
>>                .to("direct:route2");
>>
>>            from("direct:route2")
>>                 .process(new Processor() {
>>                     @Override
>>                     public void process(Exchange exchange) throws
>> Exception
>> {
>>                         System.out.println("################");
>>
>> System.out.println(exchange.getIn().getBody(String.class));
>>                     }
>>                 });
>>
>> Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
>> 'direct:route2'.
>> 'direct:route2' does some processing and get a result message2 then sent
>> it
>> to 'direct:route2'.
>> So 'direct:route2' now receive two massages. I can now print both message1
>> and message2. But I want to concatenate these two messages in
>> 'direct:route2' as one. Is there any way?
>>
>> Thanks,
>> Meng
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.
>> com/Problem-Concatenate-the-different-input-from-different-
>> route-using-aggreagationStrategy-tp5789368p5789426.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>

Reply via email to