I am a commons committer so don't think it hasn't crossed my mind. :)
On Aug 28, 2011 11:48 PM, "Claus Ibsen" <claus.ib...@gmail.com> wrote:
> On Sat, Aug 27, 2011 at 10:19 AM, James Carman
> <ja...@carmanconsulting.com> wrote:
>> Well, I can tell you that it certainly didn't seem to work the way I
>> need it to work. I need a persistent connection (with automatic
>> reconnects). I also need it to be in/out, but asynchronous (the
>> current incoming message may or may not correspond to the most
>> recently sent message). For now, I've resorted to just rolling my own
>> solution by directly coding to the Netty API. I will re-visit with
>> Camel later I'm sure. Thanks for your help.
>>
>
> Well you could also consider contribution improvements to the Camel
components.
>
> The Camel community love contributions
> http://camel.apache.org/contributing.html
>
>> On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <taar...@gmail.com> wrote:
>>> I expect that the connection will only be closed if the header
>>> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>>>
>>> Glancing at the code I see what you mean, it's quite unlike MINA's
producer
>>> which checks the session to see if it's connected and reuses it, but it
may
>>> be that under the hood, yet further under the hood, hehe, way further
down
>>> into netty's ClientBootstrap and beyond, the connection is being reused.
I
>>> don't know for sure.
>>>
>>> This is from Netty front-page, "True connectionless datagram socket
support
>>> (since 3.1)":
>>> And glancing at that bit elsewhere I think it's possible to do without
this
>>> sort of plumbing, but you'd have to jump into netty code or docs to
confirm.
>>>
>>> Depending on timing and other factors I would go ahead with a POC
because it
>>> either works or it will work, a failing test from your POC will be most
>>> welcome.
>>>
>>> Taariq
>>>
>>>
>>> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
>>> <ja...@carmanconsulting.com>wrote:
>>>
>>>> I have read the source:
>>>>
>>>>
>>>>
http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>>>
>>>> Take a look at the process() method. In there, there is a block of
>>>> code that does:
>>>>
>>>> ChannelFuture channelFuture;
>>>> final Channel channel;
>>>> try {
>>>> channelFuture = openConnection(exchange, callback);
>>>> channel = openChannel(channelFuture);
>>>> } catch (Exception e) {
>>>> exchange.setException(e);
>>>> callback.done(true);
>>>> return true;
>>>> }
>>>>
>>>>
>>>> This is not inside an if block or anything and the openConnection()
>>>> method does actually open it, it isn't just returning a
>>>> previously-opened connection or anything.
>>>>
>>>> Perhaps I'm missing something (entirely possible), but it appears that
>>>> it's opening the connection every time the process() method is called.
>>>>
>>>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <taar...@gmail.com>
wrote:
>>>> > That doesn't sound right, what have you read? Logs/docs?
>>>> > And are you using keep-alive?
>>>> >
>>>> > Taariq
>>>> >
>>>> >
>>>> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com
>
>>>> wrote:
>>>> >
>>>> >> Well, it looks like the camel-netty component won't work for me. It
>>>> >> appears that it opens the connection for each exchange. Am I
reading
>>>> >> that right? What I need is a persistent connection with automatic
>>>> >> reconnects. Oh well, back to the drawing board.
>>>> >>
>>>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>>>> >> <ja...@carmanconsulting.com> wrote:
>>>> >>> That's what I've been staring at! :) Here's what I'm thinking I'm
>>>> >>> going to need to write. I need an async processor that remembers
the
>>>> >>> AsyncCallback and associates it with a correlation id. Then, when
>>>> >>> another exchange comes in that has the same correlation id, it will
>>>> >>> lookup the previous callback and say that it's done. I have a lot
of
>>>> >>> questions, though. I've never had to get so "down and dirty" with
>>>> >>> Camel before. The components have just worked for me "off the
shelf."
>>>> >>>
>>>> >>> 1. Do I just copy the input message of the Exchange that comes in
>>>> >>> second to the output message of the originating exchange?
>>>> >>> 2. How do I do a timeout for the original caller (the CXF
request)?
>>>> >>> 3. How do I detect that the caller has timed out if they do?
>>>> >>>
>>>> >>> I'm sure I'll have more questions, but these are the ones off the
top
>>>> >>> of my head.
>>>> >>>
>>>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com>
>>>> wrote:
>>>> >>>> James I think the rest of your puzzle is solved by Camel's async
API,
>>>> >>>> you might have to check if your task is done, maybe your
>>>> >>>> requestResponse populates some collection of responses and
provides
>>>> >>>> some API to return the response given a correlationID.
>>>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll
find
>>>> >>>> your answer.
>>>> >>>>
>>>> >>>> [1] http://camel.apache.org/async.html
>>>> >>>>
>>>> >>>> Taariq
>>>> >>>>
>>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>>> >>>> <ja...@carmanconsulting.com> wrote:
>>>> >>>>> No worries! Thank you for your help. It helped me understand a
bit
>>>> >>>>> more about how these aggregators work.. However, I still don't
>>>> >>>>> understand how to take care of my problem. I guess I'm going to
have
>>>> >>>>> to roll my own processor or something.
>>>> >>>>>
>>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com
>
>>>> wrote:
>>>> >>>>>> Hmmm.
>>>> >>>>>> Maybe others can help with that if it's possible, I haven't had
to
>>>> wrestle with it.
>>>> >>>>>>
>>>> >>>>>> In my case it is actually a cxf service too, but it's
asynchronous
>>>> and I send the response once I have it, indicating either timeout or
the
>>>> actual response.
>>>> >>>>>>
>>>> >>>>>> Sorry I responded to your question without going back to see
your
>>>> other posts.
>>>> >>>>>>
>>>> >>>>>> Taariq
>>>> >>>>>>
>>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>>>> ja...@carmanconsulting.com> wrote:
>>>> >>>>>>
>>>> >>>>>>> In my case, the originating request comes from CXF. How do I
send
>>>> the
>>>> >>>>>>> aggregated response back to CXF?
>>>> >>>>>>>
>>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <
taar...@gmail.com>
>>>> wrote:
>>>> >>>>>>>> The consumer that handles the aggregated/timed-out request or
>>>> response.
>>>> >>>>>>>>
>>>> >>>>>>>> I have to resend a few times if it's the request, I simply
feed it
>>>> back into "direct:socketRequestRoute" with the header for the number of
>>>> retry attempts incremented.
>>>> >>>>>>>> If it's the response I can forward to some process.
>>>> >>>>>>>>
>>>> >>>>>>>> Taariq
>>>> >>>>>>>>
>>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
>>>> ja...@carmanconsulting.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> What's listening on the:
>>>> >>>>>>>>>
>>>> >>>>>>>>> to("direct:requestResponse")
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
>>>> taar...@gmail.com> wrote:
>>>> >>>>>>>>>> Sure
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> You can of course solve what I've described many ways, but
I'll
>>>> >>>>>>>>>> explain using 3 routes as that's what I used.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This first route is the main route I mentioned earlier, so
you
>>>> send
>>>> >>>>>>>>>> your socket messages here and it's multicast to both the
>>>> aggregator
>>>> >>>>>>>>>> and to the socket.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>>
from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>> >>>>>>>>>> "someOutboundSocketEndpoint");
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> This next route will aggregate, both requests and responses
are
>>>> sent
>>>> >>>>>>>>>> here as you envisaged.
>>>> >>>>>>>>>> from("direct:requestResponseAggregator").
>>>> >>>>>>>>>> .aggregate(header("someCorrellationId"),
>>>> >>>>>>>>>> requestResponseAggregator)
>>>> >>>>>>>>>> .completionSize(2)
>>>> >>>>>>>>>> .completionTimeout(5000)
>>>> >>>>>>>>>> .to("direct:requestResponse"); //Here you can
>>>> send the
>>>> >>>>>>>>>> "aggregated" message, in my case it's only the response I
>>>> forward
>>>> >>>>>>>>>> unless there's a timeout, then I forward the request of
course.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Finally the route that consumes the socket responses.
>>>> >>>>>>>>>>
>>>>
from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>> >>>>>>>>>> //this headerEnricher doesn't have to be a processor, you
have
>>>> many
>>>> >>>>>>>>>> options to add a header.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> If that's not clear feel free to ask.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Taariq
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>>> >>>>>>>>>>> Care to share an example? I'm not picturing it.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
>>>> taar...@gmail.com> wrote:
>>>> >>>>>>>>>>>> Hi James
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> I did that too for what it's worth.
>>>> >>>>>>>>>>>> I send the message to a route that forwards to both the
>>>> aggregator and to the socket.
>>>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID
to
>>>> the headers and then forward to the aggregator.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Taariq
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
>>>> ja...@carmanconsulting.com> wrote:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Willem,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thank you for your help. I don't think this is doing
exactly
>>>> what I
>>>> >>>>>>>>>>>>> need, though. The real trick here is the asynchronous
nature
>>>> of the
>>>> >>>>>>>>>>>>> "server" on the other end of this situation. I thought
about
>>>> using an
>>>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with
the
>>>> request
>>>> >>>>>>>>>>>>> using a correlation id. The aggregator wouldn't
aggregate
>>>> multiple
>>>> >>>>>>>>>>>>> responses together into one, it would just make sure it
>>>> matches the
>>>> >>>>>>>>>>>>> correct response with its request. Does this sound like
a
>>>> valid
>>>> >>>>>>>>>>>>> approach? If so, how the heck do I go about it? :)
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> James
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
>>>> willem.ji...@gmail.com> wrote:
>>>> >>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> Camel async process engine already provides the way that
you
>>>> want.
>>>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
>>>> example.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> [1]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>> >>>>>>>>>>>>>> [2]
>>>>
http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
>>>> hzbar...@gmail.com>
>>>> >>>>>>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> Hi James,
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are
a
>>>> few thoughts. I
>>>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
>>>> your sever (if you
>>>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too,
but
>>>> you'd have to
>>>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
>>>> scenario is converting a
>>>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then
treat
>>>> your exchange as
>>>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel)
decompose
>>>> it and compose it
>>>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I
believe
>>>> your best bet is
>>>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look
at
>>>> the examples using
>>>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that
Camel is
>>>> stateless so
>>>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have
already
>>>> and something to
>>>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you
could
>>>> write your own.
>>>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a
correlationId
>>>> and a replyTo
>>>> >>>>>>>>>>>>>>>> queue.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>>
>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate
here.
>>>> Eventually,
>>>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using
CXF
>>>> of
>>>> >>>>>>>>>>>>>>> course). So, I would need to either block the request
>>>> thread, waiting
>>>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
>>>> asynchronous
>>>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get
more
>>>> done with
>>>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
>>>> thing.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> We already have a correlation id. The "protocol"
requires
>>>> one and the
>>>> >>>>>>>>>>>>>>> server process just echos it back in the response
message.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and
if
>>>> you cannot use
>>>> >>>>>>>>>>>>>>>> the same you can do a second
transformation/correlation
>>>> using a claim-check
>>>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
>>>> implement your own (in
>>>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use
a
>>>> resequencer [4] if
>>>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use
asyncCallback,
>>>> you get the replies
>>>> >>>>>>>>>>>>>>>> when they become available, and you can control that.
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary. I don't want
to
>>>> guarantee
>>>> >>>>>>>>>>>>>>> the ordering. I'm mostly interested in throughput
here.
>>>> So, if a
>>>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
>>>> faster, so be
>>>> >>>>>>>>>>>>>>> it.
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it
more
>>>> thought, but I
>>>> >>>>>>>>>>>>>>>> hope this helps.
>>>> >>>>>>>>>>>>>>>> Hadrian
>>>> >>>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>> You have been very helpful. Thank you for taking the
time!
>>>> >>>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> --
>>>> >>>>>>>>>>>>>> Willem
>>>> >>>>>>>>>>>>>> ----------------------------------
>>>> >>>>>>>>>>>>>> FuseSource
>>>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>>>> >>>>>>>>>>>>>> Blog: http://willemjiang.blogspot.com (English)
>>>> >>>>>>>>>>>>>> http://jnn.javaeye.com (Chinese)
>>>> >>>>>>>>>>>>>> Twitter: willemjiang
>>>> >>>>>>>>>>>>>> Weibo: willemjiang
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>
>>>> >
>>>>
>>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cib...@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/