Well, it looks like the camel-netty component won't work for me.  It
appears that it opens the connection for each exchange.  Am I reading
that right?  What I need is a persistent connection with automatic
reconnects.  Oh well, back to the drawing board.

On Wed, Aug 17, 2011 at 7:59 AM, James Carman
<ja...@carmanconsulting.com> wrote:
> That's what I've been staring at! :)  Here's what I'm thinking I'm
> going to need to write.  I need an async processor that remembers the
> AsyncCallback and associates it with a correlation id.  Then, when
> another exchange comes in that has the same correlation id, it will
> lookup the previous callback and say that it's done.  I have a lot of
> questions, though.  I've never had to get so "down and dirty" with
> Camel before.  The components have just worked for me "off the shelf."
>
> 1.  Do I just copy the input message of the Exchange that comes in
> second to the output message of the originating exchange?
> 2.  How do I do a timeout for the original caller (the CXF request)?
> 3.  How do I detect that the caller has timed out if they do?
>
> I'm sure I'll have more questions, but these are the ones off the top
> of my head.
>
> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com> wrote:
>> James I think the rest of your puzzle is solved by Camel's async API,
>> you might have to check if your task is done, maybe your
>> requestResponse populates some collection of responses and provides
>> some API to return the response given a correlationID.
>> Stare at the async docs [1] a few more times and I'm sure you'll find
>> your answer.
>>
>> [1] http://camel.apache.org/async.html
>>
>> Taariq
>>
>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>> <ja...@carmanconsulting.com> wrote:
>>> No worries!  Thank you for your help.  It helped me understand a bit
>>> more about how these aggregators work..  However, I still don't
>>> understand how to take care of my problem.  I guess I'm going to have
>>> to roll my own processor or something.
>>>
>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>> Hmmm.
>>>> Maybe others can help with that if it's possible, I haven't had to wrestle 
>>>> with it.
>>>>
>>>> In my case it is actually a cxf service too, but it's asynchronous  and I 
>>>> send the response once I have it, indicating either timeout or the actual 
>>>> response.
>>>>
>>>> Sorry I responded to your question without going back to see your other 
>>>> posts.
>>>>
>>>> Taariq
>>>>
>>>> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> 
>>>> wrote:
>>>>
>>>>> In my case, the originating request comes from CXF.  How do I send the
>>>>> aggregated response back to CXF?
>>>>>
>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>>> The consumer that handles the aggregated/timed-out request or response.
>>>>>>
>>>>>> I have to resend a few times if it's the request, I simply feed it back 
>>>>>> into "direct:socketRequestRoute" with the header for the number of retry 
>>>>>> attempts incremented.
>>>>>> If it's the response I can forward to some process.
>>>>>>
>>>>>> Taariq
>>>>>>
>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> 
>>>>>> wrote:
>>>>>>
>>>>>>> What's listening on the:
>>>>>>>
>>>>>>> to("direct:requestResponse")
>>>>>>>
>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> 
>>>>>>> wrote:
>>>>>>>> Sure
>>>>>>>>
>>>>>>>> You can of course solve what I've described many ways, but I'll
>>>>>>>> explain using 3 routes as that's what I used.
>>>>>>>>
>>>>>>>> This first route is the main route I mentioned earlier, so you send
>>>>>>>> your socket messages here and it's multicast to both the aggregator
>>>>>>>> and to the socket.
>>>>>>>>
>>>>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>>>>  "someOutboundSocketEndpoint");
>>>>>>>>
>>>>>>>>
>>>>>>>> This next route will aggregate, both requests and responses are sent
>>>>>>>> here as you envisaged.
>>>>>>>> from("direct:requestResponseAggregator").
>>>>>>>>                .aggregate(header("someCorrellationId"),
>>>>>>>> requestResponseAggregator)
>>>>>>>>                .completionSize(2)
>>>>>>>>                .completionTimeout(5000)
>>>>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>>>>> "aggregated" message, in my case it's only the response I forward
>>>>>>>> unless there's a timeout, then I forward the request of course.
>>>>>>>>
>>>>>>>> Finally the route that consumes the socket responses.
>>>>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>>>>> options to add a header.
>>>>>>>>
>>>>>>>> If that's not clear feel free to ask.
>>>>>>>>
>>>>>>>> Taariq
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>>>>
>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> 
>>>>>>>>> wrote:
>>>>>>>>>> Hi James
>>>>>>>>>>
>>>>>>>>>> I did that too for what it's worth.
>>>>>>>>>> I send the message to a route that forwards to both the aggregator 
>>>>>>>>>> and to the socket.
>>>>>>>>>> When the response comes in I use an enricher to add the ID to the 
>>>>>>>>>> headers and then forward to the aggregator.
>>>>>>>>>>
>>>>>>>>>> Taariq
>>>>>>>>>>
>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman 
>>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Willem,
>>>>>>>>>>>
>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>>>>> "server" on the other end of this situation.  I thought about using 
>>>>>>>>>>> an
>>>>>>>>>>> aggregator to make sure the response gets matched up with the 
>>>>>>>>>>> request
>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> James
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang 
>>>>>>>>>>> <willem.ji...@gmail.com> wrote:
>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>
>>>>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>>>>
>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea 
>>>>>>>>>>>>> Hadrian<hzbar...@gmail.com>
>>>>>>>>>>>>>  wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few 
>>>>>>>>>>>>>> thoughts. I
>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your 
>>>>>>>>>>>>>> sever (if you
>>>>>>>>>>>>>> have your own code that does that, you can use it too, but you'd 
>>>>>>>>>>>>>> have to
>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>>>>>>>>>> converting a
>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your 
>>>>>>>>>>>>>> exchange as
>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and 
>>>>>>>>>>>>>> compose it
>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe your 
>>>>>>>>>>>>>> best bet is
>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the 
>>>>>>>>>>>>>> examples using
>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is 
>>>>>>>>>>>>>> stateless so
>>>>>>>>>>>>>> you'll need a correlationId, which you must have already and 
>>>>>>>>>>>>>> something to
>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write 
>>>>>>>>>>>>>> your own.
>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId and a 
>>>>>>>>>>>>>> replyTo
>>>>>>>>>>>>>> queue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.  
>>>>>>>>>>>>> Eventually,
>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>>>>> course).  So, I would need to either block the request thread, 
>>>>>>>>>>>>> waiting
>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more done 
>>>>>>>>>>>>> with
>>>>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires one 
>>>>>>>>>>>>> and the
>>>>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if you 
>>>>>>>>>>>>>> cannot use
>>>>>>>>>>>>>> the same you can do a second transformation/correlation using a 
>>>>>>>>>>>>>> claim-check
>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement 
>>>>>>>>>>>>>> your own (in
>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a 
>>>>>>>>>>>>>> resequencer [4] if
>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get 
>>>>>>>>>>>>>> the replies
>>>>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to 
>>>>>>>>>>>>> guarantee
>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>>>>> message comes in after another, but it can be processed faster, 
>>>>>>>>>>>>> so be
>>>>>>>>>>>>> it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more 
>>>>>>>>>>>>>> thought, but I
>>>>>>>>>>>>>> hope this helps.
>>>>>>>>>>>>>> Hadrian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Willem
>>>>>>>>>>>> ----------------------------------
>>>>>>>>>>>> FuseSource
>>>>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>>>>> Twitter: willemjiang
>>>>>>>>>>>> Weibo: willemjiang
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to