Hi,

I am using an aggregator to concatenate the body of a bunch of messages into
a single message. The following code illustrates the scenario:

<code>
package my.package;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spring.Main;

public class MyRouteBuilder extends RouteBuilder {
        private int messageIndex = 0;

    public static void main(String... args) {
        Main.main(args);
    }

    public void configure() {
        
        from("timer://foo?period=500")
                .process(new Processor() {
                                
                                public void process(Exchange exchange) throws 
Exception {
                                        exchange.getOut().setBody("[myBody-" + 
(messageIndex++) + "]");
                                        
                                        
exchange.getOut().setHeader("aggregateGroup", "group1");
                                }
                        }).to("direct:step1");

        from("direct:step1").multicast().to("direct:step2", "direct:step3");
        
        from("direct:step2").to("direct:aggregator");
        from("direct:step3").to("direct:aggregator");
        
        from("direct:aggregator").aggregate(header("aggregateGroup"), new
AggregationStrategy() {
                        
                        public Exchange aggregate(Exchange oldExchange, 
Exchange newExchange) {
                                
                                System.out.println("Getting new exchange in 
aggretator: " +
newExchange);
                                
                                if(oldExchange == null){
                                        return newExchange;
                                }else{
                                        
oldExchange.getOut().setBody((String)newExchange.getIn().getBody() +
(String)oldExchange.getIn().getBody());
                                }
                                
                                return oldExchange;
                        }
                }).batchSize(10).batchTimeout(2000L).process(new Processor() {
                        
                        public void process(Exchange exchange) throws Exception 
{
                                System.out.println("Received group: " + 
exchange.getIn().getBody() + " -
" + exchange.getIn().getHeader("aggregateGroup"));
                        }
                });
        
    }
}
</code>

When running, this route configuration generates the following output:

<output>
lease use a packageScan element instead.
[pache.camel.spring.Main.main()] SpringCamelContext             INFO 
Starting Apache Camel as property ShouldStartContext is true
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.0.0 (CamelContext:camelContext) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
Camel 2.0.0 (CamelContext:camelContext) started
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Received group:
[myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0]
- group1
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Received group:
[myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5]
- group1
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Received group:
[myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10]
- group1
</output>

The output shows that all messages are passing through the Aggregator but
the last one is missing in the resulting Exchange body. For example: the
first message group is composed by 10 messages:
{[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]}
but the resulting body is missing the last message ([myBody-14]).

Am I doing anything wrong?

Thank you.

--
Wilson Freitas
Vetta Technologies
http://www.vettatech.com
-- 
View this message in context: 
http://www.nabble.com/Aggregator-message-lost-tp25976380p25976380.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to