Hi Alex, Regarding your "open question" by your mail below please find a slightly modified version of your routing which now would pass. See also the "XXX" comments of mine. Hope this helps.
Babak public class SplitterWithAggregatorTest extends CamelTestSupport { @Test public void shouldProcessCorrectlyOnBothSources() throws Exception { MockEndpoint split = getMockEndpoint("mock:split"); split.expectedBodiesReceivedInAnyOrder("1", "2", "3"); MockEndpoint result = getMockEndpoint("mock:result2"); result.expectedBodiesReceived("1+2+3"); template.requestBody("direct:start", "A,B,C"); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .log("start body: ${body}") .to("direct:process") .log("result body: ${body}") .to("mock:result"); from("direct:process") .split(body()) // .parallelProcessing() XXX: no parallelProcessing as otherwise no gurantee for the expected order (we want the data flow "1" then "2" and at last "3") .log("Split line ${body}") .bean(new Responder()) .to("mock:split") .aggregate(header("myId"), new MyAggregationStrategy()) .completionSize(3) .log("aggregated ${body}") .log("completed by ${property.CamelAggregatedCompletedBy}") .log("test body: ${body}") .to("direct:result2"); from("direct:result2") // XXX: a third route where we get the final result of the whole process // XXX: any possible final processing goes here // ... .to("mock:result2"); } }; } public class Responder { public String translate(Exchange ex, String key) { ex.getIn().setHeader("myId", "correlation id 1"); if ("A".equals(key)) { return "1"; } else if ("B".equals(key)) { return "2"; } else { return "3"; } } } public class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = newExchange.getIn().getBody(String.class); String existing = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(existing + "+" + body); return oldExchange; } } } Aleksander Pena wrote > > Henryk, > > thanks for another way of resolving my requirements it works perfectly > well, but there are missing some important req: I need to setup timeout > dynamically, so the flow could looks like following: > > .when(body().isEqualTo("foo")) > .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT)) > .otherwise() > .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT)) > .end() > > Unfortunately I cannot find timeout with argument of Expression type :( > > It would be the best if I could specify timeout per each flow in route > like > in the below excerpt: > > from("direct:start") > .timeout(property(TIMEOUT)) > .process(new SomeProcessingHere()) > .to("direct:anotherEndpoint"); > > But probably it is difficult to implement such behaviour in Camel. > > Anyway there is still open question why my original example doesn't work. > Is it really a bug in Camel as Babak suggesting? > > Thanks for help, > Alek > -- View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717376.html Sent from the Camel - Users mailing list archive at Nabble.com.