Hi > is it actually possible with direct endpoints? No
When you send an exchange to the aggregator using e.g. producer template (requestBody) then the exchange is completed immediately as the aggregator is like a queue with an event driven consumer that does the aggregation. So the route path is "breaked" when you hit the aggregator. So that is why Camel returns the "input" in the requestBody method. You can not send to an aggregator and then wait until it has aggregated and get the "result". To get the result you must consume from the aggregator as the mock endpoint does. Med venlig hilsen Claus Ibsen ...................................... Silverbullet Skovsgårdsvænget 21 8362 Hørning Tlf. +45 2962 7576 Web: www.silverbullet.dk -----Original Message----- From: RobMMS [mailto:[EMAIL PROTECTED] Sent: 7. november 2008 17:00 To: [email protected] Subject: RE: ExchangePatterns on Bean-Component Hi, I enabled the Tracer and it routes as I would expect it, but the problem remains that List result =(List) template.sendBody("direct:start", ExchangePattern.InOut, myQueryString); only contains ResultObjectB instead of the aggregated List. I've written a smaller Testcast with plain Java DSL, where i just process Strings, and want to aggregate those and return the aggregated exchange-body: ROUTE: public void testAggregation() throws Exception { cc.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { MyAggregationStrategy ag = new MyAggregationStrategy(); from("direct:start").setHeader("ID").constant(1).multicast().setParallelProcessing(true).to("direct:stage1", "direct:stage2"); from("direct:stage1").process(new LogProcessor("A")).to("direct:merge"); from("direct:stage2").process(new LogProcessor("B")).to("direct:merge"); from("direct:merge").aggregator(header("ID"),ag).batchSize(2). to("mock:end"); } }); String res = (String) pt.requestBodyAndHeader("direct:start", "test", "ID", 1); System.out.println("RESULT:\n"+ res); assertEquals("A_test#B_test", res); MockEndpoint mock = (MockEndpoint) cc.getEndpoint("mock:end"); mock.expectedMessageCount(1); mock.assertIsSatisfied(); } LogProcessor: public class LogProcessor implements Processor { private String name; public LogProcessor(String name) { this.name=name; } public void process(Exchange exchange) throws Exception { exchange.getIn().setBody(name+"_"+exchange.getIn().getBody()); } MyAggregationStrategy: public class MyAggregationStrategy implements AggregationStrategy { private Integer count = 0; public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { count = (Integer) oldExchange.getIn().getHeader("aggregated"); if(count==null) count=1; count++; String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); Exchange copy = newExchange.copy(); copy.getIn().setBody(oldBody+"#"+newBody); copy.getIn().setHeader("aggregated", count); System.out.println(copy.getIn()); return copy; } public boolean isCompleted() { if(count!=null) System.out.println("Aggregated? -> "+(getCount()==2)); return getCount()==2; } public int getCount() { return count; } } here is the Trace-output 07.11.2008 16:55:05 org.apache.camel.impl.DefaultCamelContext <init> INFO: JMX is disabled. Using DefaultLifecycleStrategy. 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-0 Node: SetHeader[ID, 1] Headers: {ID=1} Body: test Pattern: InOut Time: 1226073306321 </TRACER> 07.11.2008 16:55:06 org.apache.camel.impl.converter.DefaultTypeConverter addTypeConverter WARNUNG: Overriding type converter from: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(javax.xml.transform.Source) throws javax.xml.transform.TransformerException,java.io.IOException to: InstanceMethodTypeConverter: public java.lang.String org.apache.camel.converter.jaxp.XmlConverter.toString(javax.xml.transform.Source) throws javax.xml.transform.TransformerException 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-1 Node: To[direct:stage1] Headers: {ID=1} Body: test Pattern: InOut Time: 1226073306508 </TRACER> 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-1 Node: [EMAIL PROTECTED] Headers: {ID=1} Body: test Pattern: InOut Time: 1226073306508 </TRACER> 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-1 Node: To[direct:merge] Headers: {ID=1} Body: A_test Pattern: InOut Time: 1226073306508 </TRACER> 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-2 Node: To[direct:stage2] Headers: {ID=1} Body: test Pattern: InOut Time: 1226073306524 </TRACER> 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-2 Node: [EMAIL PROTECTED] Headers: {ID=1} Body: test Pattern: InOut Time: 1226073306524 </TRACER> 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-2 Node: To[direct:merge] Headers: {ID=1} Body: B_test Pattern: InOut Time: 1226073306524 </TRACER> Message: A_test#B_test 07.11.2008 16:55:06 org.apache.camel.processor.Logger process INFO: <TRACER> ID: ID-WUM128085/4570-1226073306211/0-3 Node: To[mock:end] Headers: {aggregated=2, ID=1} Body: A_test#B_test Pattern: InOut Time: 1226073306524 </TRACER> this test fails with junit.framework.ComparisonFailure: result ok? expected:<[A_test#]B_test> but was:<[]B_test> although, mock:end receives the correct body. What do I have to do to receive A_test#B_test from my call: String res = (String) pt.requestBodyAndHeader("direct:start", "test", "ID", 1); ?? is it actually possible with direct endpoints? thx, Rob Claus Ibsen wrote: > > Hi > > You can use the tracer to enable tracing of the exchanges how they are > routed in Camel. > http://activemq.apache.org/camel/tracer.html > > And I suppose that you type wrong in this mail as you wrote from:a twice: > from("direct:a").process(new myProcessor()).bean(beanB, > "methodB").to("direct:merge"); > > That is should be: from("direct:b") > > > > Med venlig hilsen > > Claus Ibsen > ...................................... > Silverbullet > Skovsgårdsvænget 21 > 8362 Hørning > Tlf. +45 2962 7576 > Web: www.silverbullet.dk > > -----Original Message----- > From: RobMMS [mailto:[EMAIL PROTECTED] > Sent: 27. oktober 2008 13:15 > To: [email protected] > Subject: ExchangePatterns on Bean-Component > > > Hi, > I'm observing a somewhat strange behaviour in my route: > > from("direct:start").multicast(new MyOutAggregationStrategy(), > true).to("direct:a", "direct:b"); > > from("direct:a").process(new myProcessor()).bean(beanA, > "methodA").to("direct:merge"); > from("direct:a").process(new myProcessor()).bean(beanB, > "methodB").to("direct:merge"); > > from("direct:merge"). > aggregator(header(QueryPreProcessor.HEADER_CORRELATION_ID), new > ResultAggregationStrategy()). > completedPredicate(header("aggregated"). > isEqualTo(header("parallelStagesCount"))). > to("mock:end"); > > List result =(List) template.sendBody("direct:start", > ExchangePattern.InOut, > myQueryString); > > > What I want to do here is, I send a query-String to direct:a, which then > multicasts to my parallel working beans, that each take a String value as > paramter in methodA/B and return a ResultObjectA/B which is set as Body > automatically (I do not set this explicitly in the beans). > My ResultAggregationStrategy then puts ResultObjectA and ResultObjectB > into > als List-List which I want to get back as result of the template.sendbody > call. > > > The problem is that my result is most of the time the return value of the > first bean being processed, only sometimes (like 10% of the time) it > returns > the aggregated list as expected. > > I would expect the my route processes each step an when the aggregated > message reaches mock:end the message's out body is returned to the caller > -> > result > I'm not quite sure when I have to set the message's Out-Body and when it's > ok to set the In-Body, because I think thats the problem here, but I'm not > sure. > > Maybe I'm completely wrong, so I hope you get what I need and tell me > whats > wrong with my route ;) > > > Thanks, > Rob > -- > View this message in context: > http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20186540.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > > -- View this message in context: http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20383351.html Sent from the Camel - Users mailing list archive at Nabble.com.
