That doesn't sound right, what have you read? Logs/docs?
And are you using keep-alive?

Taariq


On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com> wrote:

> Well, it looks like the camel-netty component won't work for me.  It
> appears that it opens the connection for each exchange.  Am I reading
> that right?  What I need is a persistent connection with automatic
> reconnects.  Oh well, back to the drawing board.
> 
> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>> going to need to write.  I need an async processor that remembers the
>> AsyncCallback and associates it with a correlation id.  Then, when
>> another exchange comes in that has the same correlation id, it will
>> lookup the previous callback and say that it's done.  I have a lot of
>> questions, though.  I've never had to get so "down and dirty" with
>> Camel before.  The components have just worked for me "off the shelf."
>> 
>> 1.  Do I just copy the input message of the Exchange that comes in
>> second to the output message of the originating exchange?
>> 2.  How do I do a timeout for the original caller (the CXF request)?
>> 3.  How do I detect that the caller has timed out if they do?
>> 
>> I'm sure I'll have more questions, but these are the ones off the top
>> of my head.
>> 
>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com> wrote:
>>> James I think the rest of your puzzle is solved by Camel's async API,
>>> you might have to check if your task is done, maybe your
>>> requestResponse populates some collection of responses and provides
>>> some API to return the response given a correlationID.
>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>> your answer.
>>> 
>>> [1] http://camel.apache.org/async.html
>>> 
>>> Taariq
>>> 
>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>> <ja...@carmanconsulting.com> wrote:
>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>>> more about how these aggregators work..  However, I still don't
>>>> understand how to take care of my problem.  I guess I'm going to have
>>>> to roll my own processor or something.
>>>> 
>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>> Hmmm.
>>>>> Maybe others can help with that if it's possible, I haven't had to 
>>>>> wrestle with it.
>>>>> 
>>>>> In my case it is actually a cxf service too, but it's asynchronous  and I 
>>>>> send the response once I have it, indicating either timeout or the actual 
>>>>> response.
>>>>> 
>>>>> Sorry I responded to your question without going back to see your other 
>>>>> posts.
>>>>> 
>>>>> Taariq
>>>>> 
>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> 
>>>>> wrote:
>>>>> 
>>>>>> In my case, the originating request comes from CXF.  How do I send the
>>>>>> aggregated response back to CXF?
>>>>>> 
>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>>> 
>>>>>>> I have to resend a few times if it's the request, I simply feed it back 
>>>>>>> into "direct:socketRequestRoute" with the header for the number of 
>>>>>>> retry attempts incremented.
>>>>>>> If it's the response I can forward to some process.
>>>>>>> 
>>>>>>> Taariq
>>>>>>> 
>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> What's listening on the:
>>>>>>>> 
>>>>>>>> to("direct:requestResponse")
>>>>>>>> 
>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> 
>>>>>>>> wrote:
>>>>>>>>> Sure
>>>>>>>>> 
>>>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>> 
>>>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>>>> and to the socket.
>>>>>>>>> 
>>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>>>> here as you envisaged.
>>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>>> requestResponseAggregator)
>>>>>>>>>                .completionSize(2)
>>>>>>>>>                .completionTimeout(5000)
>>>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>>> 
>>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>>>> options to add a header.
>>>>>>>>> 
>>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>> 
>>>>>>>>> Taariq
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>>> 
>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> 
>>>>>>>>>> wrote:
>>>>>>>>>>> Hi James
>>>>>>>>>>> 
>>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>>> I send the message to a route that forwards to both the aggregator 
>>>>>>>>>>> and to the socket.
>>>>>>>>>>> When the response comes in I use an enricher to add the ID to the 
>>>>>>>>>>> headers and then forward to the aggregator.
>>>>>>>>>>> 
>>>>>>>>>>> Taariq
>>>>>>>>>>> 
>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman 
>>>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Willem,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what 
>>>>>>>>>>>> I
>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of 
>>>>>>>>>>>> the
>>>>>>>>>>>> "server" on the other end of this situation.  I thought about 
>>>>>>>>>>>> using an
>>>>>>>>>>>> aggregator to make sure the response gets matched up with the 
>>>>>>>>>>>> request
>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> 
>>>>>>>>>>>> James
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang 
>>>>>>>>>>>> <willem.ji...@gmail.com> wrote:
>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea 
>>>>>>>>>>>>>> Hadrian<hzbar...@gmail.com>
>>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few 
>>>>>>>>>>>>>>> thoughts. I
>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your 
>>>>>>>>>>>>>>> sever (if you
>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but 
>>>>>>>>>>>>>>> you'd have to
>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>>>>>>>>>>> converting a
>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your 
>>>>>>>>>>>>>>> exchange as
>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and 
>>>>>>>>>>>>>>> compose it
>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your 
>>>>>>>>>>>>>>> best bet is
>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the 
>>>>>>>>>>>>>>> examples using
>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is 
>>>>>>>>>>>>>>> stateless so
>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and 
>>>>>>>>>>>>>>> something to
>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could 
>>>>>>>>>>>>>>> write your own.
>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and 
>>>>>>>>>>>>>>> a replyTo
>>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  
>>>>>>>>>>>>>> Eventually,
>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>>>> course).  So, I would need to either block the request thread, 
>>>>>>>>>>>>>> waiting
>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done 
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one 
>>>>>>>>>>>>>> and the
>>>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you 
>>>>>>>>>>>>>>> cannot use
>>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a 
>>>>>>>>>>>>>>> claim-check
>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement 
>>>>>>>>>>>>>>> your own (in
>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a 
>>>>>>>>>>>>>>> resequencer [4] if
>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you 
>>>>>>>>>>>>>>> get the replies
>>>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to 
>>>>>>>>>>>>>> guarantee
>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if 
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>> message comes in after another, but it can be processed faster, 
>>>>>>>>>>>>>> so be
>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more 
>>>>>>>>>>>>>>> thought, but I
>>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Willem
>>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>>> FuseSource
>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to