By experimentation I managed to get the JMS component to do what I needed. I
couldn't get the SJMS one to work as its a bit to set in its ways once it
starts. The tricks were to:
1. add disableReplyTo=true to stop the InOut exchange replying to early
2. Create a custom splitter to return the exchange list in the headers
3. For some reason I had to use an onPrepare to lift the exchange up one
level - not sure why
4. Use the lazy reply logic as defined on the JMS component docs page.
Code below:

from("jms:topic:TOPIC.NAME?disableReplyTo=true")
      .aggregate(constant(true))
      .completionTimeout(2000L)
      .groupExchanges()
      .process(new GroupProcessor())
      .split().method(MyRoutes.class)
      .onPrepare(new Processor() {
          @Override
          public void process(Exchange exchange) throws Exception {
             Exchange realExchange =
exchange.getIn().getBody(Exchange.class);
             exchange.getIn().getHeaders().clear();
            
exchange.getIn().getHeaders().putAll(realExchange.getIn().getHeaders());
             exchange.getIn().setBody(realExchange.getIn().getBody());
          }
      })
      .process(new Processor() {
           @Override
           public void process(Exchange exchange) throws Exception {
                Destination replyDestination =
exchange.getIn().getHeader("JMSReplyTo", Destination.class);
                exchange.getIn().setHeader("JMSDestination",
replyDestination);
                exchange.getIn().removeHeader("JMSReplyTo");
                JmsComponent component =
exchange.getContext().getComponent("jms", JmsComponent.class);
                JmsEndpoint endpoint =
JmsEndpoint.newInstance(replyDestination, component);
                ProducerTemplate producerTemplate =
exchange.getContext().createProducerTemplate();
                producerTemplate.sendBodyAndHeaders(endpoint,
exchange.getIn().getBody(), exchange.getIn().getHeaders());
            }
     });
public class MyRoutes {
    @Handler
    public static List<Exchange> splitMe(@Header(value =
Exchange.GROUPED_EXCHANGE) List<Exchange> body) {
        return body;
    }
}

Is there a simpler way? And would these tricks work with the jetty
component. I really like the SJMS component but it doesn't have the
flexibility.
Also in the above if there is an error then the messages are lost so I need
to add a persistent store in to the route as well.
Nigel




--
View this message in context: 
http://camel.465427.n5.nabble.com/How-to-do-aggregation-of-multiple-InOut-exchanges-from-different-producers-tp5741621p5741988.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to