That doesn't sound right, what have you read? Logs/docs? And are you using keep-alive?
Taariq On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com> wrote: > Well, it looks like the camel-netty component won't work for me. It > appears that it opens the connection for each exchange. Am I reading > that right? What I need is a persistent connection with automatic > reconnects. Oh well, back to the drawing board. > > On Wed, Aug 17, 2011 at 7:59 AM, James Carman > <ja...@carmanconsulting.com> wrote: >> That's what I've been staring at! :) Here's what I'm thinking I'm >> going to need to write. I need an async processor that remembers the >> AsyncCallback and associates it with a correlation id. Then, when >> another exchange comes in that has the same correlation id, it will >> lookup the previous callback and say that it's done. I have a lot of >> questions, though. I've never had to get so "down and dirty" with >> Camel before. The components have just worked for me "off the shelf." >> >> 1. Do I just copy the input message of the Exchange that comes in >> second to the output message of the originating exchange? >> 2. How do I do a timeout for the original caller (the CXF request)? >> 3. How do I detect that the caller has timed out if they do? >> >> I'm sure I'll have more questions, but these are the ones off the top >> of my head. >> >> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com> wrote: >>> James I think the rest of your puzzle is solved by Camel's async API, >>> you might have to check if your task is done, maybe your >>> requestResponse populates some collection of responses and provides >>> some API to return the response given a correlationID. >>> Stare at the async docs [1] a few more times and I'm sure you'll find >>> your answer. >>> >>> [1] http://camel.apache.org/async.html >>> >>> Taariq >>> >>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman >>> <ja...@carmanconsulting.com> wrote: >>>> No worries! Thank you for your help. It helped me understand a bit >>>> more about how these aggregators work.. However, I still don't >>>> understand how to take care of my problem. I guess I'm going to have >>>> to roll my own processor or something. >>>> >>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>> Hmmm. >>>>> Maybe others can help with that if it's possible, I haven't had to >>>>> wrestle with it. >>>>> >>>>> In my case it is actually a cxf service too, but it's asynchronous and I >>>>> send the response once I have it, indicating either timeout or the actual >>>>> response. >>>>> >>>>> Sorry I responded to your question without going back to see your other >>>>> posts. >>>>> >>>>> Taariq >>>>> >>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> >>>>> wrote: >>>>> >>>>>> In my case, the originating request comes from CXF. How do I send the >>>>>> aggregated response back to CXF? >>>>>> >>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>>>> The consumer that handles the aggregated/timed-out request or response. >>>>>>> >>>>>>> I have to resend a few times if it's the request, I simply feed it back >>>>>>> into "direct:socketRequestRoute" with the header for the number of >>>>>>> retry attempts incremented. >>>>>>> If it's the response I can forward to some process. >>>>>>> >>>>>>> Taariq >>>>>>> >>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> >>>>>>> wrote: >>>>>>> >>>>>>>> What's listening on the: >>>>>>>> >>>>>>>> to("direct:requestResponse") >>>>>>>> >>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> >>>>>>>> wrote: >>>>>>>>> Sure >>>>>>>>> >>>>>>>>> You can of course solve what I've described many ways, but I'll >>>>>>>>> explain using 3 routes as that's what I used. >>>>>>>>> >>>>>>>>> This first route is the main route I mentioned earlier, so you send >>>>>>>>> your socket messages here and it's multicast to both the aggregator >>>>>>>>> and to the socket. >>>>>>>>> >>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>>>>>>>> "someOutboundSocketEndpoint"); >>>>>>>>> >>>>>>>>> >>>>>>>>> This next route will aggregate, both requests and responses are sent >>>>>>>>> here as you envisaged. >>>>>>>>> from("direct:requestResponseAggregator"). >>>>>>>>> .aggregate(header("someCorrellationId"), >>>>>>>>> requestResponseAggregator) >>>>>>>>> .completionSize(2) >>>>>>>>> .completionTimeout(5000) >>>>>>>>> .to("direct:requestResponse"); //Here you can send the >>>>>>>>> "aggregated" message, in my case it's only the response I forward >>>>>>>>> unless there's a timeout, then I forward the request of course. >>>>>>>>> >>>>>>>>> Finally the route that consumes the socket responses. >>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>>>>>>>> //this headerEnricher doesn't have to be a processor, you have many >>>>>>>>> options to add a header. >>>>>>>>> >>>>>>>>> If that's not clear feel free to ask. >>>>>>>>> >>>>>>>>> Taariq >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>>>>>>>> <ja...@carmanconsulting.com> wrote: >>>>>>>>>> Care to share an example? I'm not picturing it. >>>>>>>>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> Hi James >>>>>>>>>>> >>>>>>>>>>> I did that too for what it's worth. >>>>>>>>>>> I send the message to a route that forwards to both the aggregator >>>>>>>>>>> and to the socket. >>>>>>>>>>> When the response comes in I use an enricher to add the ID to the >>>>>>>>>>> headers and then forward to the aggregator. >>>>>>>>>>> >>>>>>>>>>> Taariq >>>>>>>>>>> >>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman >>>>>>>>>>> <ja...@carmanconsulting.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Willem, >>>>>>>>>>>> >>>>>>>>>>>> Thank you for your help. I don't think this is doing exactly what >>>>>>>>>>>> I >>>>>>>>>>>> need, though. The real trick here is the asynchronous nature of >>>>>>>>>>>> the >>>>>>>>>>>> "server" on the other end of this situation. I thought about >>>>>>>>>>>> using an >>>>>>>>>>>> aggregator to make sure the response gets matched up with the >>>>>>>>>>>> request >>>>>>>>>>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>>>>>>>>>> responses together into one, it would just make sure it matches the >>>>>>>>>>>> correct response with its request. Does this sound like a valid >>>>>>>>>>>> approach? If so, how the heck do I go about it? :) >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> James >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang >>>>>>>>>>>> <willem.ji...@gmail.com> wrote: >>>>>>>>>>>>> Hi James, >>>>>>>>>>>>> >>>>>>>>>>>>> Camel async process engine already provides the way that you want. >>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>>>>>>>>>> >>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>>>>>>>>>> >>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea >>>>>>>>>>>>>> Hadrian<hzbar...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi James, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few >>>>>>>>>>>>>>> thoughts. I >>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your >>>>>>>>>>>>>>> sever (if you >>>>>>>>>>>>>>> have your own code that does that, you can use it too, but >>>>>>>>>>>>>>> you'd have to >>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>>>>>>>>>> converting a >>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>>>>>>>>>> exchange as >>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and >>>>>>>>>>>>>>> compose it >>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your >>>>>>>>>>>>>>> best bet is >>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the >>>>>>>>>>>>>>> examples using >>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is >>>>>>>>>>>>>>> stateless so >>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and >>>>>>>>>>>>>>> something to >>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could >>>>>>>>>>>>>>> write your own. >>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and >>>>>>>>>>>>>>> a replyTo >>>>>>>>>>>>>>> queue. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here. >>>>>>>>>>>>>> Eventually, >>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>>>>>>>>>> course). So, I would need to either block the request thread, >>>>>>>>>>>>>> waiting >>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done >>>>>>>>>>>>>> with >>>>>>>>>>>>>> less http request threads) to do more of a continuation thing. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We already have a correlation id. The "protocol" requires one >>>>>>>>>>>>>> and the >>>>>>>>>>>>>> server process just echos it back in the response message. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you >>>>>>>>>>>>>>> cannot use >>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a >>>>>>>>>>>>>>> claim-check >>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement >>>>>>>>>>>>>>> your own (in >>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a >>>>>>>>>>>>>>> resequencer [4] if >>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you >>>>>>>>>>>>>>> get the replies >>>>>>>>>>>>>>> when they become available, and you can control that. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't think a resequencer is necessary. I don't want to >>>>>>>>>>>>>> guarantee >>>>>>>>>>>>>> the ordering. I'm mostly interested in throughput here. So, if >>>>>>>>>>>>>> a >>>>>>>>>>>>>> message comes in after another, but it can be processed faster, >>>>>>>>>>>>>> so be >>>>>>>>>>>>>> it. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more >>>>>>>>>>>>>>> thought, but I >>>>>>>>>>>>>>> hope this helps. >>>>>>>>>>>>>>> Hadrian >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> You have been very helpful. Thank you for taking the time! >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Willem >>>>>>>>>>>>> ---------------------------------- >>>>>>>>>>>>> FuseSource >>>>>>>>>>>>> Web: http://www.fusesource.com >>>>>>>>>>>>> Blog: http://willemjiang.blogspot.com (English) >>>>>>>>>>>>> http://jnn.javaeye.com (Chinese) >>>>>>>>>>>>> Twitter: willemjiang >>>>>>>>>>>>> Weibo: willemjiang >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>> >>> >>