Quite sorry for the trouble.

It seems AggregatorStrategy might be the wrong EIP to use for my business
case. I ran your sample and I just edited it to adapt it, and I am still not
getting desired result. 

Here is what I'd like to achieve:

Client Request -> remote WebService 1 -> remote webservice 1's reponse ->
(client request + remote webservice response) -> remote webservice 2 ->
remote webservice 2's response.

Someone made mention of content enricher, is that more appropriate for this
scenario?

See the edited script:


<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";
trace="true">
        <route>
                <from 
uri="jetty:http://0.0.0.0:9001/kimonoservice?matchOnUriPrefix=true";
/>
                <to uri="xslt:requestToSOAP.xsl"/>
                <wireTap uri="direct:tap"/>             
                <to uri="xslt:KXMLRequestToTerminalManagerRequest.xsl"/>
                <convertBodyTo type="javax.xml.transform.dom.DOMSource" />
                <to
uri="nmr:{http://services.locator/}TerminalServicesService:TerminalServicesPort"/>
                <convertBodyTo type="javax.xml.transform.stream.StreamSource" 
/>   
                <to uri="xslt:TerminalResponseToKXMLRequest.xsl"/>              
                <to uri="direct:tap"/>
        </route>
        
        <route>
                <from uri="direct:tap"/>
                <aggregate batchSize="2" batchTimeout="5000" 
strategyRef="myStrategy">
                        
<correlationExpression><constant>true</constant></correlationExpression>
                        <to uri="direct:aggregated"/>
                </aggregate>
        </route>
        
        <route>
                <from uri="direct:aggregated"/>
                <to uri="log:KimonoResponse3"/>
        </route>
</camelContext>


And here is the print out on my console:

22:31:59,421 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8) xslt://requestToSOAP.xsl --> wireTap(direct://tap) <<<
Pattern:InOut, Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010
21:31:59 GMT}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<kxml>
<app>purchase</app>
<terminfo>
<tim>9:23:58</tim>
<uid>M25155336EUH</uid>
<csid>N/A</csid>
<lang>EN</lang>
</terminfo>
<request>
<amountText>230</amountText>
<sellAccSelBox>current</sellAccSelBox>
</request>
</kxml>
</soap:Body>
</soap:Envelope>

22:31:59,421 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8) wireTap(direct://tap) -->
xslt://KXMLRequestToTerminalManagerRequest.xsl <<< Pattern:InOut,
Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010 21:31:59 GMT},
BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<kxml>
<app>purchase</app>
<terminfo>
<tim>9:23:58</tim>
<uid>M25155336EUH</uid>
<csid>N/A</csid>
<lang>EN</lang>
</terminfo>
<request>
<amountText>230</amountText>
<sellAccSelBox>current</sellAccSelBox>
</request>
</kxml>
</soap:Body>
</soap:Envelope>

22:31:59,421 | INFO  | ap[direct://tap] | Tracer                           |
rg.apache.camel.processor.Logger   88 | 29e418ed-d5bc-49b9-99cc-d0f1deee5b4b
>>> (route9) from(http://0.0.0.0:9001/kimonoservice) --> aggregate[true] <<<
Pattern:InOnly, Headers:{Date=Thu, 28 Jan 2010 21:31:59 GMT,
CamelHttpResponseCode=200}, BodyType:String, Body:<?xml version="1.0"
encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<kxml>
<app>purchase</app>
<terminfo>
<tim>9:23:58</tim>
<uid>M25155336EUH</uid>
<csid>N/A</csid>
<lang>EN</lang>
</terminfo>
<request>
<amountText>230</amountText>
<sellAccSelBox>current</sellAccSelBox>
</request>
</kxml>
</soap:Body>
</soap:Envelope>

22:31:59,437 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8) xslt://KXMLRequestToTerminalManagerRequest.xsl -->  <<<
Pattern:InOut, Headers:{CamelHttpResponseCode=200, Date=Thu, 28 Jan 2010
21:31:59 GMT}, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<tm:TerminalRequest xmlns:tm="http://services.locator/";>
<tm:uniqueId>M25155336EUH</tm:uniqueId>
<tm:cellStationId>N/A</tm:cellStationId>
<tm:batteryStatus/>
<tm:terminalTime>9:23:58</tm:terminalTime>
</tm:TerminalRequest>
</soap:Body>
</soap:Envelope>

22:31:59,437 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8)  -->
nmr://{http://services.locator/}TerminalServicesService:TerminalServicesPort
<<< Pattern:InOut, Headers:{Date=Thu, 28 Jan 2010 21:31:59 GMT,
CamelHttpResponseCode=200}, BodyType:javax.xml.transform.dom.DOMSource,
Body:<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<tm:TerminalRequest xmlns:tm="http://services.locator/";>
<tm:uniqueId>M25155336EUH</tm:uniqueId>
<tm:cellStationId>N/A</tm:cellStationId>
<tm:batteryStatus/>
<tm:terminalTime>9:23:58</tm:terminalTime>
</tm:TerminalRequest>
</soap:Body>
</soap:Envelope>
22:32:00,671 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8)
nmr://{http://services.locator/}TerminalServicesService:TerminalServicesPort
-->  <<< Pattern:InOut, BodyType:javax.xml.transform.dom.DOMSource,
Body:<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";><soap:Body><_ns_:TerminalRequestResponse
xmlns:_ns_="http://services.locator/";><return
xmlns:tns="http://services.locator/";
type="locator.dto.TerminalResponse"><code>-1</code><message>Unknown
error</message></return></_ns_:TerminalRequestResponse></soap:Body></soap:Envelope>
22:32:00,687 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8)  --> xslt://TerminalResponseToKXMLRequest.xsl <<<
Pattern:InOut, BodyType:org.apache.camel.converter.jaxp.StringSource,
Body:<soap:Envelope
xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";><soap:Body><_ns_:TerminalRequestResponse
xmlns:_ns_="http://services.locator/";><return
xmlns:tns="http://services.locator/";
type="locator.dto.TerminalResponse"><code>-1</code><message>Unknown
error</message></return></_ns_:TerminalRequestResponse></soap:Body></soap:Envelope>
22:32:00,687 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route8) xslt://TerminalResponseToKXMLRequest.xsl --> direct://tap <<<
Pattern:InOut, BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<tm:TerminalResponse xmlns:tm="http://services.locator/";>
<tm:code>-1</tm:code>
<tm:message>Unknown error</tm:message>
</tm:TerminalResponse>
</soap:Body>
</soap:Envelope>

22:32:00,687 | INFO  | 19967...@qtp9-1  | Tracer                           |
rg.apache.camel.processor.Logger   88 | 04a712d3-a734-4391-abf7-b42dc22e2629
>>> (route9) direct://tap --> aggregate[true] <<< Pattern:InOut,
BodyType:String, Body:<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/";>
<soap:Body>
<tm:TerminalResponse xmlns:tm="http://services.locator/";>
<tm:code>-1</tm:code>
<tm:message>Unknown error</tm:message>
</tm:TerminalResponse>
</soap:Body>
</soap:Envelope>

22:45:55,453 | INFO  | heckpoint Worker | MessageDatabase                  |
emq.store.kahadb.MessageDatabase  605 | Slow KahaDB access: cleanup took
1985


My AggregationStrategy class looks like:

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
        {

                if (oldExchange == null) 
                {
                return newExchange;
                }

                LOG.info("old: " + oldExchange);
                LOG.info("new: " + newExchange);
                
                try 
                {
                        Message newIn = newExchange.getIn();
                        String oldBody = 
oldExchange.getIn().getBody(String.class);
                        String newBody = newIn.getBody(String.class);
                        
                        DocumentBuilderFactory documentBuilderFactory =
DocumentBuilderFactory.newInstance();
                        DocumentBuilder mergedDocumentbuilder =
documentBuilderFactory.newDocumentBuilder();
                        Document mergedDocument = 
mergedDocumentbuilder.newDocument();
                        Element envelopeMergedElement =
mergedDocument.createElement("soap:Envelope");
                        envelopeMergedElement.setAttribute("xmlns:soap",
"http://schemas.xmlsoap.org/soap/envelope/";);
                        Element bodyMergedElement = 
mergedDocument.createElement("soap:Body");
                        Element mergedElement = 
mergedDocument.createElement("kxml");
                        bodyMergedElement.appendChild(mergedElement);
                        envelopeMergedElement.appendChild(bodyMergedElement);
                        
                        String xml1 = oldExchange.toString();
                        xml1 = String.valueOf(xml1);
                        int headerend = xml1.indexOf("?>"); // remove <?xml 
version="1.0"
encoding="UTF-8"?>
                        xml1 = xml1.substring(headerend + 2, xml1.length());
                        xml1 = xml1.replaceAll("\r\n", "");
                        xml1 = xml1.replaceAll("[", "");
                        xml1 = xml1.replaceAll("]", "");
                        xml1 = xml1.trim();
                        LOG.info("Normalized xml1: " + xml1);                   
                        
                        String xml2 = newExchange.toString();
                        LOG.info("Data 2: " + String.valueOf(xml2));
                        xml2 = String.valueOf(xml2);
                        headerend = xml2.indexOf("?>"); // remove <?xml 
version="1.0"
encoding="UTF-8"?>
                        xml2 = xml2.substring(headerend + 2, xml2.length());
                        xml2 = xml2.replaceAll("\r\n", "");
                        xml2 = xml2.replaceAll("[", "");
                        xml2 = xml2.replaceAll("]", "");
                        xml2 = xml2.trim();
                        LOG.info("New Normalized xml2: " + xml2);               
        
                        
                        
                        DocumentBuilder oldDocumentBuilder =
documentBuilderFactory.newDocumentBuilder();
                        Document oldDocument = oldDocumentBuilder.parse(new 
InputSource(new
StringReader(String.valueOf(xml1))));
                        Element oldElement = oldDocument.getDocumentElement();
                        NodeList oldList = oldElement.getChildNodes();
                        if(oldList != null && oldList.getLength() > 0) 
                        {
                                LOG.info("Node length: " + oldList.getLength());
                                for(int i = 0 ; i < oldList.getLength();i++) 
                                {
                                        Element el = (Element)oldList.item(i);
                                        Node newNode = 
mergedDocument.importNode(el, true);
                                        LOG.info("Node key: " + 
el.getNodeName());
                                        LOG.info("Node value: " + 
el.getTextContent());
                                        mergedElement.appendChild(newNode);
                                }
                        }
                        
                        DocumentBuilder newDocumentBuilder =
documentBuilderFactory.newDocumentBuilder();
                        Document newDocument = newDocumentBuilder.parse(new 
InputSource(new
StringReader(String.valueOf(xml2))));
                        Element newElement = newDocument.getDocumentElement();
                        NodeList newList = newElement.getChildNodes();
                        if(newList != null && newList.getLength() > 0) 
                        {
                                LOG.info("New Node length: " + 
newList.getLength());
                                for(int i = 0 ; i < newList.getLength();i++) 
                                {
                                        Element el = (Element)newList.item(i);
                                        Node newNode = 
mergedDocument.importNode(el, true);
                                        LOG.info("New Node key: " + 
el.getNodeName());
                                        LOG.info("New Node value: " + 
el.getTextContent());
                                        mergedElement.appendChild(newNode);
                                }
                        }
                        
                        newIn.setBody(envelopeMergedElement);
                        
                        LOG.info("New Exchange: " + newExchange);
                }
                catch (Exception e) {
                        // TODO: handle exception
                }
                                
                return newExchange;
        }


Thanks for your time.






Claus Ibsen-2 wrote:
> 
> On Sun, Jan 24, 2010 at 7:59 PM, lekkie <[email protected]> wrote:
>>
>> I tried this but it did not work.
>>
>> Something strange however, happens. I noticed if I use the default
>> aggregationStrategy, the log:Response1 will be logged (as null), however
>> if
>> I changed it to myAggregatorStrategy, it never prints the log (let alone
>> log:Response3), instead it prints log:Response2 which prints the old and
>> new
>> exchange separately (at different times).
>>
>>  <route>
>>        <from uri="direct:AggregatorServices" />
>>        <aggregate strategyRef="myAggregatorStrategy" batchSize="2"
>> batchTimeout="30000">
>>        <!-- <aggregate batchSize="2" batchTimeout="30000"
>> groupExchanges="true">-->
>>                <correlationExpression>
>>                        <constant>true</constant>
>>                    </correlationExpression>
>>                    <to uri="log:Response1"/>
>>                    <from uri="direct:ProcessorServices" />
>>            </aggregate>
>>            <to uri="log:Response2"/>
>> </route>
>> <route>
>>        <from uri="direct:ProcessorServices" />
>>                <to uri="log:KimonoResponse3"/>
>>    </route>
>>
>> Something is wrong somewhere, just can't figure it out yet.
>>
>>
> 
> See this unit test
> http://svn.apache.org/viewvc?rev=903849&view=rev
> 
> You cannot do a <from> inside the route! From are used to define
> inputs to the route and not in the middle of the route.
> Use the unit test as a base to get your stuff working.
> 
> 
> 
>>
>> Claus Ibsen-2 wrote:
>>>
>>> Hi
>>>
>>> I have created an unit test which does what you want
>>> http://svn.apache.org/viewvc?rev=902559&view=rev
>>>
>>> You should use batchSize = 2 and not OUIT batch size. OUT batch size
>>> is when you have multiple correlation groups.
>>> Which yo do not have since you use true as the correlation id.
>>>
>>> Also note that the default timeout of 1 sec will kick in, so if you
>>> send in message in slow pace then set it to a higher number.
>>>
>>> The aggregator will be overhauled in the future, hopefully in 2.3
>>>
>>>
>>> On Fri, Jan 22, 2010 at 10:05 AM, lekkie <[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I want to combine messages from 2 different exchanges into a single
>>>> exchange, from the EIP patterns aggregation strategy seems to be right
>>>> pattern to use.
>>>>
>>>> However, after reading the doc
>>>> (http://camel.apache.org/aggregator.html),
>>>> I
>>>> followed the instructns but my aggregation strategy only returns the
>>>> last
>>>> exchange, even though I specifically configured it to send messages
>>>> only
>>>> when the outbatchsize is 2.
>>>>
>>>> I implemented a custom aggregationstrategy with shoulld combine my
>>>> exchanges
>>>> into one. I log the event in the class and it works fine, what is
>>>> beyond
>>>> me
>>>> is why it returns only the last exchange.
>>>>
>>>>
>>>> See my config here:
>>>> <bean id="myAggregatorStrategy"
>>>> class="org.tempuri.MyAggregationStrategy"/>
>>>>
>>>> <osgi:camelContext xmlns="http://camel.apache.org/schema/spring";
>>>> trace="true">
>>>> <route>
>>>>        <from uri="direct:RequestProcessor" />
>>>>        <to uri="xslt:requestToSOAP.xsl"/>
>>>>        <wireTap uri="direct:AggregatorServices"/>
>>>>        <to uri="xslt:requestToManager.xsl"/>
>>>>        <convertBodyTo type="javax.xml.transform.dom.DOMSource" />
>>>>        <to uri="nmr:{http://services.locator/}Service:ServicesPort"/>
>>>>        <to uri="direct:AggregatorServices"/>
>>>> </route>
>>>>
>>>> <route>
>>>>        <from uri="direct:AggregatorServices" />
>>>>        <aggregate strategyRef="myAggregatorStrategy" outBatchSize="2">
>>>>                <correlationExpression>
>>>>                        <constant>true</constant>
>>>>                </correlationExpression>
>>>>                <to uri="direct:ProcessorServices"/>
>>>>         </aggregate>
>>>> </route>
>>>>
>>>>  <route>
>>>>        <from uri="direct:ProcessorServices" />
>>>>        <to uri="log:Response"/>
>>>> </route>
>>>> </osgi:camelContext>
>>>>
>>>> log:Response only print out response from <to
>>>> uri="nmr:{http://services.locator/}Service:ServicesPort"/>.
>>>>
>>>> Meanwhile, the log inside myStratRef (myAggregatorStrategy), which I
>>>> ask
>>>> to
>>>> concatenate the old & new exchanges shows both exchanges were
>>>> concatenated.
>>>> How do I get this concatenated exchange to be sent (to log:Resposne)?
>>>>
>>>> Regards.
>>>> --
>>>> View this message in context:
>>>> http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27270355.html
>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> Apache Camel Committer
>>>
>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>> Open Source Integration: http://fusesource.com
>>> Blog: http://davsclaus.blogspot.com/
>>> Twitter: http://twitter.com/davsclaus
>>>
>>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27297994.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> Claus Ibsen
> Apache Camel Committer
> 
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
> 
> 

-- 
View this message in context: 
http://old.nabble.com/Aggregator%27s-not-returning-combined-exchanges%2C-only-returns-last-exchange-tp27270355p27370609.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to