Hi

Do you "loose" message every time you run the unit test?
Have you tried with a higher batch timeout?



On Tue, Oct 20, 2009 at 4:43 PM, Wilson <wilson.li...@gmail.com> wrote:
>
> Hi,
>
> I am using an aggregator to concatenate the body of a bunch of messages into
> a single message. The following code illustrates the scenario:
>
> <code>
> package my.package;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.camel.spring.Main;
>
> public class MyRouteBuilder extends RouteBuilder {
>        private int messageIndex = 0;
>
>    public static void main(String... args) {
>        Main.main(args);
>    }
>
>    public void configure() {
>
>        from("timer://foo?period=500")
>                .process(new Processor() {
>
>                                public void process(Exchange exchange) throws 
> Exception {
>                                        exchange.getOut().setBody("[myBody-" + 
> (messageIndex++) + "]");
>
>                                        
> exchange.getOut().setHeader("aggregateGroup", "group1");
>                                }
>                        }).to("direct:step1");
>
>        from("direct:step1").multicast().to("direct:step2", "direct:step3");
>
>        from("direct:step2").to("direct:aggregator");
>        from("direct:step3").to("direct:aggregator");
>
>        from("direct:aggregator").aggregate(header("aggregateGroup"), new
> AggregationStrategy() {
>
>                        public Exchange aggregate(Exchange oldExchange, 
> Exchange newExchange) {
>
>                                System.out.println("Getting new exchange in 
> aggretator: " +
> newExchange);
>
>                                if(oldExchange == null){
>                                        return newExchange;
>                                }else{
>                                        
> oldExchange.getOut().setBody((String)newExchange.getIn().getBody() +
> (String)oldExchange.getIn().getBody());
>                                }
>
>                                return oldExchange;
>                        }
>                }).batchSize(10).batchTimeout(2000L).process(new Processor() {
>
>                        public void process(Exchange exchange) throws 
> Exception {
>                                System.out.println("Received group: " + 
> exchange.getIn().getBody() + " -
> " + exchange.getIn().getHeader("aggregateGroup"));
>                        }
>                });
>
>    }
> }
> </code>
>
> When running, this route configuration generates the following output:
>
> <output>
> lease use a packageScan element instead.
> [pache.camel.spring.Main.main()] SpringCamelContext             INFO
> Starting Apache Camel as property ShouldStartContext is true
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.0.0 (CamelContext:camelContext) is starting
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.0.0 (CamelContext:camelContext) started
> Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
> Received group:
> [myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0]
> - group1
> Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
> Received group:
> [myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5]
> - group1
> Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
> Received group:
> [myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10]
> - group1
> </output>
>
> The output shows that all messages are passing through the Aggregator but
> the last one is missing in the resulting Exchange body. For example: the
> first message group is composed by 10 messages:
> {[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]}
> but the resulting body is missing the last message ([myBody-14]).
>
> Am I doing anything wrong?
>
> Thank you.
>
> --
> Wilson Freitas
> Vetta Technologies
> http://www.vettatech.com
> --
> View this message in context: 
> http://www.nabble.com/Aggregator-message-lost-tp25976380p25976380.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to