Quite sorry for the trouble. It seems AggregatorStrategy might be the wrong EIP to use for my business case. I ran your sample and I just edited it to adapt it, and I am still not getting desired result.
Here is what I'd like to achieve: Client Request -> remote WebService 1 -> remote webservice 1's reponse -> (client request + remote webservice response) -> remote webservice 2 -> remote webservice 2's response. Someone made mention of content enricher, is that more appropriate for this scenario? See the edited script: <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" trace="true"> <route> <from uri="jetty:http://0.0.0.0:9001/kimonoservice?matchOnUriPrefix=true" /> <to uri="xslt:requestToSOAP.xsl"/> <wireTap uri="direct:tap"/> <to uri="xslt:KXMLRequestToTerminalManagerRequest.xsl"/> <convertBodyTo type="javax.xml.transform.dom.DOMSource" /> <to uri="nmr:{http://services.locator/}TerminalServicesService:TerminalServicesPort"/> <convertBodyTo type="javax.xml.transform.stream.StreamSource" /> <to uri="xslt:TerminalResponseToKXMLRequest.xsl"/> <to uri="direct:tap"/> </route> <route> <from uri="direct:tap"/> <aggregate batchSize="2" batchTimeout="5000" strategyRef="myStrategy"> <correlationExpression><constant>true</constant></correlationExpression> <to uri="direct:aggregated"/> </aggregate> </route> <route> <from uri="direct:aggregated"/> <to uri="log:KimonoResponse3"/> </route> </camelContext> And here is the print out on my console: 22:31:59,421 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) xslt://requestToSOAP.xsl --> wireTap(direct://tap) <<< Pattern:InOut, Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010 21:31:59 GMT}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <kxml> <app>purchase</app> <terminfo> <tim>9:23:58</tim> <uid>M25155336EUH</uid> <csid>N/A</csid> <lang>EN</lang> </terminfo> <request> <amountText>230</amountText> <sellAccSelBox>current</sellAccSelBox> </request> </kxml> </soap:Body> </soap:Envelope> 22:31:59,421 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) wireTap(direct://tap) --> xslt://KXMLRequestToTerminalManagerRequest.xsl <<< Pattern:InOut, Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010 21:31:59 GMT}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <kxml> <app>purchase</app> <terminfo> <tim>9:23:58</tim> <uid>M25155336EUH</uid> <csid>N/A</csid> <lang>EN</lang> </terminfo> <request> <amountText>230</amountText> <sellAccSelBox>current</sellAccSelBox> </request> </kxml> </soap:Body> </soap:Envelope> 22:31:59,421 | INFO | ap[direct://tap] | Tracer | rg.apache.camel.processor.Logger 88 | 29e418ed-d5bc-49b9-99cc-d0f1deee5b4b >>> (route9) from(http://0.0.0.0:9001/kimonoservice) --> aggregate[true] <<< Pattern:InOnly, Headers:{Date=Thu, 28 Jan 2010 21:31:59 GMT, CamelHttpResponseCode=200}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <kxml> <app>purchase</app> <terminfo> <tim>9:23:58</tim> <uid>M25155336EUH</uid> <csid>N/A</csid> <lang>EN</lang> </terminfo> <request> <amountText>230</amountText> <sellAccSelBox>current</sellAccSelBox> </request> </kxml> </soap:Body> </soap:Envelope> 22:31:59,437 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) xslt://KXMLRequestToTerminalManagerRequest.xsl --> <<< Pattern:InOut, Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010 21:31:59 GMT}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <tm:TerminalRequest xmlns:tm="http://services.locator/"> <tm:uniqueId>M25155336EUH</tm:uniqueId> <tm:cellStationId>N/A</tm:cellStationId> <tm:batteryStatus/> <tm:terminalTime>9:23:58</tm:terminalTime> </tm:TerminalRequest> </soap:Body> </soap:Envelope> 22:31:59,437 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) --> nmr://{http://services.locator/}TerminalServicesService:TerminalServicesPort <<< Pattern:InOut, Headers:{Date=Thu, 28 Jan 2010 21:31:59 GMT, CamelHttpResponseCode=200}, BodyType:javax.xml.transform.dom.DOMSource, Body:<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <tm:TerminalRequest xmlns:tm="http://services.locator/"> <tm:uniqueId>M25155336EUH</tm:uniqueId> <tm:cellStationId>N/A</tm:cellStationId> <tm:batteryStatus/> <tm:terminalTime>9:23:58</tm:terminalTime> </tm:TerminalRequest> </soap:Body> </soap:Envelope> 22:32:00,671 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) nmr://{http://services.locator/}TerminalServicesService:TerminalServicesPort --> <<< Pattern:InOut, BodyType:javax.xml.transform.dom.DOMSource, Body:<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Body><_ns_:TerminalRequestResponse xmlns:_ns_="http://services.locator/"><return xmlns:tns="http://services.locator/" type="locator.dto.TerminalResponse"><code>-1</code><message>Unknown error</message></return></_ns_:TerminalRequestResponse></soap:Body></soap:Envelope> 22:32:00,687 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) --> xslt://TerminalResponseToKXMLRequest.xsl <<< Pattern:InOut, BodyType:org.apache.camel.converter.jaxp.StringSource, Body:<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"><soap:Body><_ns_:TerminalRequestResponse xmlns:_ns_="http://services.locator/"><return xmlns:tns="http://services.locator/" type="locator.dto.TerminalResponse"><code>-1</code><message>Unknown error</message></return></_ns_:TerminalRequestResponse></soap:Body></soap:Envelope> 22:32:00,687 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route8) xslt://TerminalResponseToKXMLRequest.xsl --> direct://tap <<< Pattern:InOut, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <tm:TerminalResponse xmlns:tm="http://services.locator/"> <tm:code>-1</tm:code> <tm:message>Unknown error</tm:message> </tm:TerminalResponse> </soap:Body> </soap:Envelope> 22:32:00,687 | INFO | 19967...@qtp9-1 | Tracer | rg.apache.camel.processor.Logger 88 | 04a712d3-a734-4391-abf7-b42dc22e2629 >>> (route9) direct://tap --> aggregate[true] <<< Pattern:InOut, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?> <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <tm:TerminalResponse xmlns:tm="http://services.locator/"> <tm:code>-1</tm:code> <tm:message>Unknown error</tm:message> </tm:TerminalResponse> </soap:Body> </soap:Envelope> 22:45:55,453 | INFO | heckpoint Worker | MessageDatabase | emq.store.kahadb.MessageDatabase 605 | Slow KahaDB access: cleanup took 1985 My AggregationStrategy class looks like: public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } LOG.info("old: " + oldExchange); LOG.info("new: " + newExchange); try { Message newIn = newExchange.getIn(); String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newIn.getBody(String.class); DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder mergedDocumentbuilder = documentBuilderFactory.newDocumentBuilder(); Document mergedDocument = mergedDocumentbuilder.newDocument(); Element envelopeMergedElement = mergedDocument.createElement("soap:Envelope"); envelopeMergedElement.setAttribute("xmlns:soap", "http://schemas.xmlsoap.org/soap/envelope/"); Element bodyMergedElement = mergedDocument.createElement("soap:Body"); Element mergedElement = mergedDocument.createElement("kxml"); bodyMergedElement.appendChild(mergedElement); envelopeMergedElement.appendChild(bodyMergedElement); String xml1 = oldExchange.toString(); xml1 = String.valueOf(xml1); int headerend = xml1.indexOf("?>"); // remove <?xml version="1.0" encoding="UTF-8"?> xml1 = xml1.substring(headerend + 2, xml1.length()); xml1 = xml1.replaceAll("\r\n", ""); xml1 = xml1.replaceAll("[", ""); xml1 = xml1.replaceAll("]", ""); xml1 = xml1.trim(); LOG.info("Normalized xml1: " + xml1); String xml2 = newExchange.toString(); LOG.info("Data 2: " + String.valueOf(xml2)); xml2 = String.valueOf(xml2); headerend = xml2.indexOf("?>"); // remove <?xml version="1.0" encoding="UTF-8"?> xml2 = xml2.substring(headerend + 2, xml2.length()); xml2 = xml2.replaceAll("\r\n", ""); xml2 = xml2.replaceAll("[", ""); xml2 = xml2.replaceAll("]", ""); xml2 = xml2.trim(); LOG.info("New Normalized xml2: " + xml2); DocumentBuilder oldDocumentBuilder = documentBuilderFactory.newDocumentBuilder(); Document oldDocument = oldDocumentBuilder.parse(new InputSource(new StringReader(String.valueOf(xml1)))); Element oldElement = oldDocument.getDocumentElement(); NodeList oldList = oldElement.getChildNodes(); if(oldList != null && oldList.getLength() > 0) { LOG.info("Node length: " + oldList.getLength()); for(int i = 0 ; i < oldList.getLength();i++) { Element el = (Element)oldList.item(i); Node newNode = mergedDocument.importNode(el, true); LOG.info("Node key: " + el.getNodeName()); LOG.info("Node value: " + el.getTextContent()); mergedElement.appendChild(newNode); } } DocumentBuilder newDocumentBuilder = documentBuilderFactory.newDocumentBuilder(); Document newDocument = newDocumentBuilder.parse(new InputSource(new StringReader(String.valueOf(xml2)))); Element newElement = newDocument.getDocumentElement(); NodeList newList = newElement.getChildNodes(); if(newList != null && newList.getLength() > 0) { LOG.info("New Node length: " + newList.getLength()); for(int i = 0 ; i < newList.getLength();i++) { Element el = (Element)newList.item(i); Node newNode = mergedDocument.importNode(el, true); LOG.info("New Node key: " + el.getNodeName()); LOG.info("New Node value: " + el.getTextContent()); mergedElement.appendChild(newNode); } } newIn.setBody(envelopeMergedElement); LOG.info("New Exchange: " + newExchange); } catch (Exception e) { // TODO: handle exception } return newExchange; } Thanks for your time. Claus Ibsen-2 wrote: > > On Sun, Jan 24, 2010 at 7:59 PM, lekkie <[email protected]> wrote: >> >> I tried this but it did not work. >> >> Something strange however, happens. I noticed if I use the default >> aggregationStrategy, the log:Response1 will be logged (as null), however >> if >> I changed it to myAggregatorStrategy, it never prints the log (let alone >> log:Response3), instead it prints log:Response2 which prints the old and >> new >> exchange separately (at different times). >> >> <route> >> <from uri="direct:AggregatorServices" /> >> <aggregate strategyRef="myAggregatorStrategy" batchSize="2" >> batchTimeout="30000"> >> <!-- <aggregate batchSize="2" batchTimeout="30000" >> groupExchanges="true">--> >> <correlationExpression> >> <constant>true</constant> >> </correlationExpression> >> <to uri="log:Response1"/> >> <from uri="direct:ProcessorServices" /> >> </aggregate> >> <to uri="log:Response2"/> >> </route> >> <route> >> <from uri="direct:ProcessorServices" /> >> <to uri="log:KimonoResponse3"/> >> </route> >> >> Something is wrong somewhere, just can't figure it out yet. >> >> > > See this unit test > http://svn.apache.org/viewvc?rev=903849&view=rev > > You cannot do a <from> inside the route! From are used to define > inputs to the route and not in the middle of the route. > Use the unit test as a base to get your stuff working. > > > >> >> Claus Ibsen-2 wrote: >>> >>> Hi >>> >>> I have created an unit test which does what you want >>> http://svn.apache.org/viewvc?rev=902559&view=rev >>> >>> You should use batchSize = 2 and not OUIT batch size. OUT batch size >>> is when you have multiple correlation groups. >>> Which yo do not have since you use true as the correlation id. >>> >>> Also note that the default timeout of 1 sec will kick in, so if you >>> send in message in slow pace then set it to a higher number. >>> >>> The aggregator will be overhauled in the future, hopefully in 2.3 >>> >>> >>> On Fri, Jan 22, 2010 at 10:05 AM, lekkie <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> I want to combine messages from 2 different exchanges into a single >>>> exchange, from the EIP patterns aggregation strategy seems to be right >>>> pattern to use. >>>> >>>> However, after reading the doc >>>> (http://camel.apache.org/aggregator.html), >>>> I >>>> followed the instructns but my aggregation strategy only returns the >>>> last >>>> exchange, even though I specifically configured it to send messages >>>> only >>>> when the outbatchsize is 2. >>>> >>>> I implemented a custom aggregationstrategy with shoulld combine my >>>> exchanges >>>> into one. I log the event in the class and it works fine, what is >>>> beyond >>>> me >>>> is why it returns only the last exchange. >>>> >>>> >>>> See my config here: >>>> <bean id="myAggregatorStrategy" >>>> class="org.tempuri.MyAggregationStrategy"/> >>>> >>>> <osgi:camelContext xmlns="http://camel.apache.org/schema/spring" >>>> trace="true"> >>>> <route> >>>> <from uri="direct:RequestProcessor" /> >>>> <to uri="xslt:requestToSOAP.xsl"/> >>>> <wireTap uri="direct:AggregatorServices"/> >>>> <to uri="xslt:requestToManager.xsl"/> >>>> <convertBodyTo type="javax.xml.transform.dom.DOMSource" /> >>>> <to uri="nmr:{http://services.locator/}Service:ServicesPort"/> >>>> <to uri="direct:AggregatorServices"/> >>>> </route> >>>> >>>> <route> >>>> <from uri="direct:AggregatorServices" /> >>>> <aggregate strategyRef="myAggregatorStrategy" outBatchSize="2"> >>>> <correlationExpression> >>>> <constant>true</constant> >>>> </correlationExpression> >>>> <to uri="direct:ProcessorServices"/> >>>> </aggregate> >>>> </route> >>>> >>>> <route> >>>> <from uri="direct:ProcessorServices" /> >>>> <to uri="log:Response"/> >>>> </route> >>>> </osgi:camelContext> >>>> >>>> log:Response only print out response from <to >>>> uri="nmr:{http://services.locator/}Service:ServicesPort"/>. >>>> >>>> Meanwhile, the log inside myStratRef (myAggregatorStrategy), which I >>>> ask >>>> to >>>> concatenate the old & new exchanges shows both exchanges were >>>> concatenated. >>>> How do I get this concatenated exchange to be sent (to log:Resposne)? >>>> >>>> Regards. >>>> -- >>>> View this message in context: >>>> http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27270355.html >>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>> >>>> >>> >>> >>> >>> -- >>> Claus Ibsen >>> Apache Camel Committer >>> >>> Author of Camel in Action: http://www.manning.com/ibsen/ >>> Open Source Integration: http://fusesource.com >>> Blog: http://davsclaus.blogspot.com/ >>> Twitter: http://twitter.com/davsclaus >>> >>> >> >> -- >> View this message in context: >> http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27297994.html >> Sent from the Camel - Users mailing list archive at Nabble.com. >> >> > > > > -- > Claus Ibsen > Apache Camel Committer > > Author of Camel in Action: http://www.manning.com/ibsen/ > Open Source Integration: http://fusesource.com > Blog: http://davsclaus.blogspot.com/ > Twitter: http://twitter.com/davsclaus > > -- View this message in context: http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27370609.html Sent from the Camel - Users mailing list archive at Nabble.com.
