Hmmm. Maybe others can help with that if it's possible, I haven't had to wrestle with it.
In my case it is actually a cxf service too, but it's asynchronous and I send the response once I have it, indicating either timeout or the actual response. Sorry I responded to your question without going back to see your other posts. Taariq On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote: > In my case, the originating request comes from CXF. How do I send the > aggregated response back to CXF? > > On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote: >> The consumer that handles the aggregated/timed-out request or response. >> >> I have to resend a few times if it's the request, I simply feed it back into >> "direct:socketRequestRoute" with the header for the number of retry attempts >> incremented. >> If it's the response I can forward to some process. >> >> Taariq >> >> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote: >> >>> What's listening on the: >>> >>> to("direct:requestResponse") >>> >>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote: >>>> Sure >>>> >>>> You can of course solve what I've described many ways, but I'll >>>> explain using 3 routes as that's what I used. >>>> >>>> This first route is the main route I mentioned earlier, so you send >>>> your socket messages here and it's multicast to both the aggregator >>>> and to the socket. >>>> >>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>>> "someOutboundSocketEndpoint"); >>>> >>>> >>>> This next route will aggregate, both requests and responses are sent >>>> here as you envisaged. >>>> from("direct:requestResponseAggregator"). >>>> .aggregate(header("someCorrellationId"), >>>> requestResponseAggregator) >>>> .completionSize(2) >>>> .completionTimeout(5000) >>>> .to("direct:requestResponse"); //Here you can send the >>>> "aggregated" message, in my case it's only the response I forward >>>> unless there's a timeout, then I forward the request of course. >>>> >>>> Finally the route that consumes the socket responses. >>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>>> //this headerEnricher doesn't have to be a processor, you have many >>>> options to add a header. >>>> >>>> If that's not clear feel free to ask. >>>> >>>> Taariq >>>> >>>> >>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>>> <ja...@carmanconsulting.com> wrote: >>>>> Care to share an example? I'm not picturing it. >>>>> >>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>>> Hi James >>>>>> >>>>>> I did that too for what it's worth. >>>>>> I send the message to a route that forwards to both the aggregator and >>>>>> to the socket. >>>>>> When the response comes in I use an enricher to add the ID to the >>>>>> headers and then forward to the aggregator. >>>>>> >>>>>> Taariq >>>>>> >>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> >>>>>> wrote: >>>>>> >>>>>>> Willem, >>>>>>> >>>>>>> Thank you for your help. I don't think this is doing exactly what I >>>>>>> need, though. The real trick here is the asynchronous nature of the >>>>>>> "server" on the other end of this situation. I thought about using an >>>>>>> aggregator to make sure the response gets matched up with the request >>>>>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>>>>> responses together into one, it would just make sure it matches the >>>>>>> correct response with its request. Does this sound like a valid >>>>>>> approach? If so, how the heck do I go about it? :) >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> James >>>>>>> >>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> >>>>>>> wrote: >>>>>>>> Hi James, >>>>>>>> >>>>>>>> Camel async process engine already provides the way that you want. >>>>>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>>>>> >>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>>>>> >>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>>> >>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi James, >>>>>>>>>> >>>>>>>>>> I hope I understand your scenario correctly. Here are a few >>>>>>>>>> thoughts. I >>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever >>>>>>>>>> (if you >>>>>>>>>> have your own code that does that, you can use it too, but you'd >>>>>>>>>> have to >>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>>>>> converting a >>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>>>>> exchange as >>>>>>>>>> an async in-out and let your framework (Camel) decompose it and >>>>>>>>>> compose it >>>>>>>>>> back again. I would not keep threads blocked so I believe your best >>>>>>>>>> bet is >>>>>>>>>> using the Camel async messaging [2] and Futures (look at the >>>>>>>>>> examples using >>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless >>>>>>>>>> so >>>>>>>>>> you'll need a correlationId, which you must have already and >>>>>>>>>> something to >>>>>>>>>> keep your state. A good bet would be jms [3], or you could write >>>>>>>>>> your own. >>>>>>>>>> If you used jms you would need to use both a correlationId and a >>>>>>>>>> replyTo >>>>>>>>>> queue. >>>>>>>>>> >>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>>> >>>>>>>>> >>>>>>>>> Perhaps a bit more information might be appropriate here. Eventually, >>>>>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>>>>> course). So, I would need to either block the request thread, waiting >>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>>>>> processing stuff (I'm thinking this might help us get more done with >>>>>>>>> less http request threads) to do more of a continuation thing. >>>>>>>>> >>>>>>>>> We already have a correlation id. The "protocol" requires one and the >>>>>>>>> server process just echos it back in the response message. >>>>>>>>> >>>>>>>>>> You may have to play a bit with the correlationId and if you cannot >>>>>>>>>> use >>>>>>>>>> the same you can do a second transformation/correlation using a >>>>>>>>>> claim-check >>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your >>>>>>>>>> own (in >>>>>>>>>> memory) persistence and correlation. You can also use a resequencer >>>>>>>>>> [4] if >>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the >>>>>>>>>> replies >>>>>>>>>> when they become available, and you can control that. >>>>>>>>>> >>>>>>>>> >>>>>>>>> I don't think a resequencer is necessary. I don't want to guarantee >>>>>>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>>>>>> message comes in after another, but it can be processed faster, so be >>>>>>>>> it. >>>>>>>>> >>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, >>>>>>>>>> but I >>>>>>>>>> hope this helps. >>>>>>>>>> Hadrian >>>>>>>>>> >>>>>>>>> >>>>>>>>> You have been very helpful. Thank you for taking the time! >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Willem >>>>>>>> ---------------------------------- >>>>>>>> FuseSource >>>>>>>> Web: http://www.fusesource.com >>>>>>>> Blog: http://willemjiang.blogspot.com (English) >>>>>>>> http://jnn.javaeye.com (Chinese) >>>>>>>> Twitter: willemjiang >>>>>>>> Weibo: willemjiang >>>>>>>> >>>>>> >>>>> >>>> >>