Well, it looks like the camel-netty component won't work for me. It appears that it opens the connection for each exchange. Am I reading that right? What I need is a persistent connection with automatic reconnects. Oh well, back to the drawing board.
On Wed, Aug 17, 2011 at 7:59 AM, James Carman <ja...@carmanconsulting.com> wrote: > That's what I've been staring at! :) Here's what I'm thinking I'm > going to need to write. I need an async processor that remembers the > AsyncCallback and associates it with a correlation id. Then, when > another exchange comes in that has the same correlation id, it will > lookup the previous callback and say that it's done. I have a lot of > questions, though. I've never had to get so "down and dirty" with > Camel before. The components have just worked for me "off the shelf." > > 1. Do I just copy the input message of the Exchange that comes in > second to the output message of the originating exchange? > 2. How do I do a timeout for the original caller (the CXF request)? > 3. How do I detect that the caller has timed out if they do? > > I'm sure I'll have more questions, but these are the ones off the top > of my head. > > On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com> wrote: >> James I think the rest of your puzzle is solved by Camel's async API, >> you might have to check if your task is done, maybe your >> requestResponse populates some collection of responses and provides >> some API to return the response given a correlationID. >> Stare at the async docs [1] a few more times and I'm sure you'll find >> your answer. >> >> [1] http://camel.apache.org/async.html >> >> Taariq >> >> On Tue, Aug 16, 2011 at 11:16 PM, James Carman >> <ja...@carmanconsulting.com> wrote: >>> No worries! Thank you for your help. It helped me understand a bit >>> more about how these aggregators work.. However, I still don't >>> understand how to take care of my problem. I guess I'm going to have >>> to roll my own processor or something. >>> >>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote: >>>> Hmmm. >>>> Maybe others can help with that if it's possible, I haven't had to wrestle >>>> with it. >>>> >>>> In my case it is actually a cxf service too, but it's asynchronous and I >>>> send the response once I have it, indicating either timeout or the actual >>>> response. >>>> >>>> Sorry I responded to your question without going back to see your other >>>> posts. >>>> >>>> Taariq >>>> >>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> >>>> wrote: >>>> >>>>> In my case, the originating request comes from CXF. How do I send the >>>>> aggregated response back to CXF? >>>>> >>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>>> The consumer that handles the aggregated/timed-out request or response. >>>>>> >>>>>> I have to resend a few times if it's the request, I simply feed it back >>>>>> into "direct:socketRequestRoute" with the header for the number of retry >>>>>> attempts incremented. >>>>>> If it's the response I can forward to some process. >>>>>> >>>>>> Taariq >>>>>> >>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> >>>>>> wrote: >>>>>> >>>>>>> What's listening on the: >>>>>>> >>>>>>> to("direct:requestResponse") >>>>>>> >>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> >>>>>>> wrote: >>>>>>>> Sure >>>>>>>> >>>>>>>> You can of course solve what I've described many ways, but I'll >>>>>>>> explain using 3 routes as that's what I used. >>>>>>>> >>>>>>>> This first route is the main route I mentioned earlier, so you send >>>>>>>> your socket messages here and it's multicast to both the aggregator >>>>>>>> and to the socket. >>>>>>>> >>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>>>>>>> "someOutboundSocketEndpoint"); >>>>>>>> >>>>>>>> >>>>>>>> This next route will aggregate, both requests and responses are sent >>>>>>>> here as you envisaged. >>>>>>>> from("direct:requestResponseAggregator"). >>>>>>>> .aggregate(header("someCorrellationId"), >>>>>>>> requestResponseAggregator) >>>>>>>> .completionSize(2) >>>>>>>> .completionTimeout(5000) >>>>>>>> .to("direct:requestResponse"); //Here you can send the >>>>>>>> "aggregated" message, in my case it's only the response I forward >>>>>>>> unless there's a timeout, then I forward the request of course. >>>>>>>> >>>>>>>> Finally the route that consumes the socket responses. >>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>>>>>>> //this headerEnricher doesn't have to be a processor, you have many >>>>>>>> options to add a header. >>>>>>>> >>>>>>>> If that's not clear feel free to ask. >>>>>>>> >>>>>>>> Taariq >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>>>>>>> <ja...@carmanconsulting.com> wrote: >>>>>>>>> Care to share an example? I'm not picturing it. >>>>>>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>> Hi James >>>>>>>>>> >>>>>>>>>> I did that too for what it's worth. >>>>>>>>>> I send the message to a route that forwards to both the aggregator >>>>>>>>>> and to the socket. >>>>>>>>>> When the response comes in I use an enricher to add the ID to the >>>>>>>>>> headers and then forward to the aggregator. >>>>>>>>>> >>>>>>>>>> Taariq >>>>>>>>>> >>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman >>>>>>>>>> <ja...@carmanconsulting.com> wrote: >>>>>>>>>> >>>>>>>>>>> Willem, >>>>>>>>>>> >>>>>>>>>>> Thank you for your help. I don't think this is doing exactly what I >>>>>>>>>>> need, though. The real trick here is the asynchronous nature of the >>>>>>>>>>> "server" on the other end of this situation. I thought about using >>>>>>>>>>> an >>>>>>>>>>> aggregator to make sure the response gets matched up with the >>>>>>>>>>> request >>>>>>>>>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>>>>>>>>> responses together into one, it would just make sure it matches the >>>>>>>>>>> correct response with its request. Does this sound like a valid >>>>>>>>>>> approach? If so, how the heck do I go about it? :) >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> James >>>>>>>>>>> >>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang >>>>>>>>>>> <willem.ji...@gmail.com> wrote: >>>>>>>>>>>> Hi James, >>>>>>>>>>>> >>>>>>>>>>>> Camel async process engine already provides the way that you want. >>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>>>>>>>>> >>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>>>>>>>>> >>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea >>>>>>>>>>>>> Hadrian<hzbar...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi James, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few >>>>>>>>>>>>>> thoughts. I >>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your >>>>>>>>>>>>>> sever (if you >>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd >>>>>>>>>>>>>> have to >>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>>>>>>>>> converting a >>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>>>>>>>>> exchange as >>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and >>>>>>>>>>>>>> compose it >>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your >>>>>>>>>>>>>> best bet is >>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the >>>>>>>>>>>>>> examples using >>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is >>>>>>>>>>>>>> stateless so >>>>>>>>>>>>>> you'll need a correlationId, which you must have already and >>>>>>>>>>>>>> something to >>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write >>>>>>>>>>>>>> your own. >>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a >>>>>>>>>>>>>> replyTo >>>>>>>>>>>>>> queue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Perhaps a bit more information might be appropriate here. >>>>>>>>>>>>> Eventually, >>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>>>>>>>>> course). So, I would need to either block the request thread, >>>>>>>>>>>>> waiting >>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done >>>>>>>>>>>>> with >>>>>>>>>>>>> less http request threads) to do more of a continuation thing. >>>>>>>>>>>>> >>>>>>>>>>>>> We already have a correlation id. The "protocol" requires one >>>>>>>>>>>>> and the >>>>>>>>>>>>> server process just echos it back in the response message. >>>>>>>>>>>>> >>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you >>>>>>>>>>>>>> cannot use >>>>>>>>>>>>>> the same you can do a second transformation/correlation using a >>>>>>>>>>>>>> claim-check >>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement >>>>>>>>>>>>>> your own (in >>>>>>>>>>>>>> memory) persistence and correlation. You can also use a >>>>>>>>>>>>>> resequencer [4] if >>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get >>>>>>>>>>>>>> the replies >>>>>>>>>>>>>> when they become available, and you can control that. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I don't think a resequencer is necessary. I don't want to >>>>>>>>>>>>> guarantee >>>>>>>>>>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>>>>>>>>>> message comes in after another, but it can be processed faster, >>>>>>>>>>>>> so be >>>>>>>>>>>>> it. >>>>>>>>>>>>> >>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more >>>>>>>>>>>>>> thought, but I >>>>>>>>>>>>>> hope this helps. >>>>>>>>>>>>>> Hadrian >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> You have been very helpful. Thank you for taking the time! >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Willem >>>>>>>>>>>> ---------------------------------- >>>>>>>>>>>> FuseSource >>>>>>>>>>>> Web: http://www.fusesource.com >>>>>>>>>>>> Blog: http://willemjiang.blogspot.com (English) >>>>>>>>>>>> http://jnn.javaeye.com (Chinese) >>>>>>>>>>>> Twitter: willemjiang >>>>>>>>>>>> Weibo: willemjiang >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> >