Hi,
I am using an aggregator to concatenate the body of a bunch of messages into
a single message. The following code illustrates the scenario:
<code>
package my.package;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spring.Main;
public class MyRouteBuilder extends RouteBuilder {
private int messageIndex = 0;
public static void main(String... args) {
Main.main(args);
}
public void configure() {
from("timer://foo?period=500")
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
exchange.getOut().setBody("[myBody-" +
(messageIndex++) + "]");
exchange.getOut().setHeader("aggregateGroup", "group1");
}
}).to("direct:step1");
from("direct:step1").multicast().to("direct:step2", "direct:step3");
from("direct:step2").to("direct:aggregator");
from("direct:step3").to("direct:aggregator");
from("direct:aggregator").aggregate(header("aggregateGroup"), new
AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange,
Exchange newExchange) {
System.out.println("Getting new exchange in
aggretator: " +
newExchange);
if(oldExchange == null){
return newExchange;
}else{
oldExchange.getOut().setBody((String)newExchange.getIn().getBody() +
(String)oldExchange.getIn().getBody());
}
return oldExchange;
}
}).batchSize(10).batchTimeout(2000L).process(new Processor() {
public void process(Exchange exchange) throws Exception
{
System.out.println("Received group: " +
exchange.getIn().getBody() + " -
" + exchange.getIn().getHeader("aggregateGroup"));
}
});
}
}
</code>
When running, this route configuration generates the following output:
<output>
lease use a packageScan element instead.
[pache.camel.spring.Main.main()] SpringCamelContext INFO
Starting Apache Camel as property ShouldStartContext is true
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.0.0 (CamelContext:camelContext) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache
Camel 2.0.0 (CamelContext:camelContext) started
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Received group:
[myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0]
- group1
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Received group:
[myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5]
- group1
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Received group:
[myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10]
- group1
</output>
The output shows that all messages are passing through the Aggregator but
the last one is missing in the resulting Exchange body. For example: the
first message group is composed by 10 messages:
{[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]}
but the resulting body is missing the last message ([myBody-14]).
Am I doing anything wrong?
Thank you.
--
Wilson Freitas
Vetta Technologies
http://www.vettatech.com
--
View this message in context:
http://www.nabble.com/Aggregator-message-lost-tp25976380p25976380.html
Sent from the Camel - Users mailing list archive at Nabble.com.