Hi Do you "loose" message every time you run the unit test? Have you tried with a higher batch timeout?
On Tue, Oct 20, 2009 at 4:43 PM, Wilson <wilson.li...@gmail.com> wrote: > > Hi, > > I am using an aggregator to concatenate the body of a bunch of messages into > a single message. The following code illustrates the scenario: > > <code> > package my.package; > > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.processor.aggregate.AggregationStrategy; > import org.apache.camel.spring.Main; > > public class MyRouteBuilder extends RouteBuilder { > private int messageIndex = 0; > > public static void main(String... args) { > Main.main(args); > } > > public void configure() { > > from("timer://foo?period=500") > .process(new Processor() { > > public void process(Exchange exchange) throws > Exception { > exchange.getOut().setBody("[myBody-" + > (messageIndex++) + "]"); > > > exchange.getOut().setHeader("aggregateGroup", "group1"); > } > }).to("direct:step1"); > > from("direct:step1").multicast().to("direct:step2", "direct:step3"); > > from("direct:step2").to("direct:aggregator"); > from("direct:step3").to("direct:aggregator"); > > from("direct:aggregator").aggregate(header("aggregateGroup"), new > AggregationStrategy() { > > public Exchange aggregate(Exchange oldExchange, > Exchange newExchange) { > > System.out.println("Getting new exchange in > aggretator: " + > newExchange); > > if(oldExchange == null){ > return newExchange; > }else{ > > oldExchange.getOut().setBody((String)newExchange.getIn().getBody() + > (String)oldExchange.getIn().getBody()); > } > > return oldExchange; > } > }).batchSize(10).batchTimeout(2000L).process(new Processor() { > > public void process(Exchange exchange) throws > Exception { > System.out.println("Received group: " + > exchange.getIn().getBody() + " - > " + exchange.getIn().getHeader("aggregateGroup")); > } > }); > > } > } > </code> > > When running, this route configuration generates the following output: > > <output> > lease use a packageScan element instead. > [pache.camel.spring.Main.main()] SpringCamelContext INFO > Starting Apache Camel as property ShouldStartContext is true > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache > Camel 2.0.0 (CamelContext:camelContext) is starting > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache > Camel 2.0.0 (CamelContext:camelContext) started > Getting new exchange in aggretator: Exchange[Message: [myBody-0]] > Getting new exchange in aggretator: Exchange[Message: [myBody-0]] > Getting new exchange in aggretator: Exchange[Message: [myBody-1]] > Getting new exchange in aggretator: Exchange[Message: [myBody-1]] > Getting new exchange in aggretator: Exchange[Message: [myBody-2]] > Getting new exchange in aggretator: Exchange[Message: [myBody-2]] > Getting new exchange in aggretator: Exchange[Message: [myBody-3]] > Getting new exchange in aggretator: Exchange[Message: [myBody-3]] > Getting new exchange in aggretator: Exchange[Message: [myBody-4]] > Getting new exchange in aggretator: Exchange[Message: [myBody-4]] > Received group: > [myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0] > - group1 > Getting new exchange in aggretator: Exchange[Message: [myBody-5]] > Getting new exchange in aggretator: Exchange[Message: [myBody-5]] > Getting new exchange in aggretator: Exchange[Message: [myBody-6]] > Getting new exchange in aggretator: Exchange[Message: [myBody-6]] > Getting new exchange in aggretator: Exchange[Message: [myBody-7]] > Getting new exchange in aggretator: Exchange[Message: [myBody-7]] > Getting new exchange in aggretator: Exchange[Message: [myBody-8]] > Getting new exchange in aggretator: Exchange[Message: [myBody-8]] > Getting new exchange in aggretator: Exchange[Message: [myBody-9]] > Getting new exchange in aggretator: Exchange[Message: [myBody-9]] > Received group: > [myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5] > - group1 > Getting new exchange in aggretator: Exchange[Message: [myBody-10]] > Getting new exchange in aggretator: Exchange[Message: [myBody-10]] > Getting new exchange in aggretator: Exchange[Message: [myBody-11]] > Getting new exchange in aggretator: Exchange[Message: [myBody-11]] > Getting new exchange in aggretator: Exchange[Message: [myBody-12]] > Getting new exchange in aggretator: Exchange[Message: [myBody-12]] > Getting new exchange in aggretator: Exchange[Message: [myBody-13]] > Getting new exchange in aggretator: Exchange[Message: [myBody-13]] > Getting new exchange in aggretator: Exchange[Message: [myBody-14]] > Getting new exchange in aggretator: Exchange[Message: [myBody-14]] > Received group: > [myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10] > - group1 > </output> > > The output shows that all messages are passing through the Aggregator but > the last one is missing in the resulting Exchange body. For example: the > first message group is composed by 10 messages: > {[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]} > but the resulting body is missing the last message ([myBody-14]). > > Am I doing anything wrong? > > Thank you. > > -- > Wilson Freitas > Vetta Technologies > http://www.vettatech.com > -- > View this message in context: > http://www.nabble.com/Aggregator-message-lost-tp25976380p25976380.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus