1. You will get an exception. I forgot the exact type, but a call to OutputCollector will typically be on the stack trace.
2. I'm not familiar enough with AsyncHttpClient to say. I know with Apache HttpClient you can set a timeout which puts an upper bounds on the amount of time an HTTP request will take. Maybe you can do something similar in AsyncHttpClient. On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]> wrote: > > Hi, > 1. OutputCollector access must always be synchronized if you are using it > from multiple threads > Could you give me an example of any side effects that might occur while > using multiple threads in case I won’t synchronise OutputCollector? > > 2. I guess I didn’t formulate my question right. > I do need the emit execution after I get any response (success or fail) > but what with “never-got-response” case? the next bolt will never emitted > and I will never be able to tell what happened to that request? of course I > can manage state before execute and after execute but than it becomes ugly > isn’t it? > > thanks. > > > I > > On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote: > > 1. Yes OutputCollector access must always be synchronized if you are using > it from multiple threads > > 2. I guess it depends. If you need the web server response before the > tuple gets sent to the next bolt, then you have to put something in your > error handling to emit it. If you don't need the response before sending > to the next bolt, then instead of putting emit in onCompleted you can put > it in execute after you setup the HTTP request. > > On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]> > wrote: >> >> Hi Nathan, >> First of all I want to thank for your responding. it's really helpful and >> increasing my knowledge in that technology. >> >> If you prefer I can replay back to you via the group list just I didn't >> want to annoy the user list. >> >> hope it's ok if I could keep asking few more questions about my scenario: >> >> >> By your answers I understand that I should add the emit line within the >> response clauses: >> >> check the following code: (I tried to make it short) >> >> @Override >> public Response onCompleted(Response response) throws >> basicOutputCollector.emit(new Values(pushMessage)); >> >> return response; >> } >> >> @Override >> public void onThrowable(Throwable t) { >> basicOutputCollector.emit(new Values(pushMessage)); >> } >> }); >> >> 1. If I use the BaseBasicBolt I shouldnt care about acking but only care >> about synchronising the basicOutputCollector >> >> is that Right? >> >> 2. How would I take care a scenario where my webservice never respond to >> a request? how could I "timeout" and emit to my next bolt? I mean if I >> never got a response the emit line will never be executed >> >> Thanks again, >> Idan >> >> >> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>: >>> >>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt >>> >>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually >>> >>> 3. If you use BaseBasicBolt then your tuple will be acked immediately >>> after execute and you won't have to worry about timeouts due to HTTP >>> response time. >>> >> >> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]> >> wrote: >>> >>> @Nathan: >>> >>> Right now the demand is to emit the tuple onFail and onComplete without >>> retrying (as I wrote we emitting into a Persister Bolt) >>> >>> 1.If I am using BaseBasicBolt I still can ack manually? >>> 2. why should I ack manually in the final response clauses while using >>> BaseBasicBolt? >>> >>> >>> * "..This means that if you don't get a response your tuple will get >>> timed out.." >>> >>> >>> 3. I didnt get the part how I can set timeout for specific bolt(so in my >>> case if I never get response ill be able to handle this scenario without >>> necessary retrying, maybe just logging) >>> >>> Thank you. >>> >>> >>> >>> >>> >>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>: >>>> >>>> it's required that you synchronize on the output collector. If you >>>> need wider throughput through this bolt and are limited by synchronizing on >>>> the output collector, you will need to add more tasks. >>>> >>>> I would emit / ack on the completion of the HTTP request. This means >>>> that if you don't get a response your tuple will get timed out (and if you >>>> have reliable message handling the fail method will be called on the >>>> spout). If you catch the error condition yourself you can always >>>> explicitly fail the tuple using the output collector, or retry manually. >>>> >>>> I would reiterate that how you handle the failure states depends on the >>>> requirements of your system. I'm assuming that you want to make use of >>>> storm's at least once semantics, but as mentioned if your requirements >>>> don't demand these semantics, you can simplify your logic (and possibly >>>> even use BaseBasicBolt as you were before). >>>> >>> >>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> >>> wrote: >>>> >>>> 1. I afraid that the synchronize on basicOutputCollector will harm >>>> performances what do you think? >>>> >>>> 2.What do you think about emitting at onComplete of the async response >>>> to the next bolt as shown below? That means the topology will continue to >>>> the next bolt only after i get response from the external service(what >>>> happen if I never gotten response.. need to solve this) >>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote: >>>> >>>>> You would care if you wanted to replay the tuple tree in case of >>>>> failure. Consider what happens if your bolt thread fails, or if the >>>>> worker >>>>> process fails. Will you recover properly? Is it a requirement to recover >>>>> properly in this scenario? (it might not be, sometimes it's ok to lose >>>>> data). >>>>> >>>>> Regarding synchronization and acking, I would ack after emit, and >>>>> synchronize access to emit and ack only, not the entire execute method. >>>>> Probably something like synchronized(basicOutputCollector) { ... }. >>>>> >>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]> >>>>> wrote: >>>>> >>>>>> " It is better to manually control the acking so that it is not done >>>>>> until the tuple is fully processed" >>>>>> Why would I care about that? I get the response onCompleted or >>>>>> onThrowable if anything happens ill take care for it there. >>>>>> >>>>>> btw: Apparently I do have another bolt after this call. it's a >>>>>> 'Persistor' Bolt which responsible to persist the answer into a >>>>>> datasource. >>>>>> guess that make things even more complicated: >>>>>> >>>>>> Are you talking about something like this? >>>>>> >>>>>> @Override >>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector >>>>>> basicOutputCollector) { >>>>>> >>>>>> >>>>>> PushMessage pushMessage = (PushMessage) >>>>>> tuple.getValueByField("pushMessage"); >>>>>> final String messageId = pushMessage.getMessageId(); >>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>> AsyncCompletionHandler<Response>() { >>>>>> @Override >>>>>> public Response onCompleted(Response response) throws Exception { >>>>>> String innerMessageId = messageId; >>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>> "responseBody=" + response.getResponseBody()); >>>>>> >>>>>> basicOutputCollector.emit(new Values(pushMessage)); >>>>>> >>>>>> return response; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void onThrowable(Throwable t) { >>>>>> t.printStackTrace(); >>>>>> } >>>>>> }); >>>>>> } >>>>>> >>>>>> >>>>>> check the synchronized and the basicOutputCollector.emit(new >>>>>> Values(pushMessage)); additions >>>>>> >>>>>> Where would you add the acking(if needed) ? >>>>>> thank you >>>>>> >>>>>> >>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>> >>>>>>> Sorry, I missed that. I would recommend you use IRichBolt (or >>>>>>> BaseRichBolt). In BaseBasicBolt, you will ack once execute is finished, >>>>>>> but this is no guarantee that your HTTP Request has actually completed. >>>>>>> It >>>>>>> is better to manually control the acking so that it is not done until >>>>>>> the >>>>>>> tuple is fully processed. This will allow you to catch scenarios where >>>>>>> the >>>>>>> request fails and replay accordingly, and will also allow for proper >>>>>>> "back >>>>>>> pressure" by preventing too many simultaneous HTTP requests. To >>>>>>> elaborate, >>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will >>>>>>> indicate to >>>>>>> the spout that you are done processing this tree (since your bolt is at >>>>>>> the >>>>>>> end of the topology), and the spout will be able to emit additional >>>>>>> tuples. However, you may still be processing tuples that you have >>>>>>> acked, >>>>>>> since you are doing HTTP requests asynchronously. This may cause you to >>>>>>> have more requests in flight that you anticipated. >>>>>> >>>>>> >>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Nathan, >>>>>>> >>>>>>> excuse if I miss something here. I dont understand why I need to ack >>>>>>> at all? I am extending from BaseBasicBolt. and we know that it's >>>>>>> automatically provides anchoring >>>>>>> and acking for us. >>>>>>> >>>>>>> So you saying although I am using BaseBasicBolt I should take care >>>>>>> of the Acking(and synchronizing it) because I am using async responses >>>>>>> logic? >>>>>>> >>>>>>> thanks, >>>>>>> Idan. >>>>>>> >>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>> >>>>>>>> OK. :) If you use a separate thread, just make sure to wrap all >>>>>>>> accesses to the OutputCollector object with synchronized. From what I >>>>>>>> see >>>>>>>> it doesn't look like you use OutputCollector, but maybe, for example, >>>>>>>> it's >>>>>>>> best to ack the messages in your response handler. If many response >>>>>>>> handlers can be running simultaneously, then you would need to >>>>>>>> synchronize >>>>>>>> the calls to ack() as they are made on the OutputCollector. >>>>>>> >>>>>>> >>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if >>>>>>>> the synchronization in my storm bolt is concerned only to the >>>>>>>> execute method. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>> >>>>>>>>> you should look up the synchronized keyword in java: >>>>>>>>> >>>>>>>>> >>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >>>>>>>>> >>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html >>>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for your response Itay. I understood you. >>>>>>>>> How would you modify the above code to make sure I am >>>>>>>>> synchronizing/make >>>>>>>>> calls from the same Thread ? >>>>>>>>> >>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>: >>>>>>>>>> >>>>>>>>>> Idan, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does >>>>>>>>>> not throttle traffic. It means that the last bolt would be >>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 >>>>>>>>>> concurrent >>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would >>>>>>>>>> exhaust >>>>>>>>>> its resources. You would have been better off with throtteling. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The output collector is the object that you perform "ack" or >>>>>>>>>> "fail" the tuple. You probably call them from a future callback. >>>>>>>>>> Make sure >>>>>>>>>> that all of these callbacks are called from the same thread, or are >>>>>>>>>> synchronized. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Itai >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ------------------------------ >>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM >>>>>>>>>> *To:* [email protected] >>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt >>>>>>>>>> if it is out of resources. So you should consider using >>>>>>>>>> max-spout-pending >>>>>>>>>> for spout level throttling. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Itai, >>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont >>>>>>>>>> have this problem?? >>>>>>>>>> >>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector >>>>>>>>>> when your NIO response workers handle the returned requests as >>>>>>>>>> OutputCollector is not thread safe. >>>>>>>>>> >>>>>>>>>> @Michael, >>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue? >>>>>>>>>> >>>>>>>>>> This is my execution code.. that code could caouse me any >>>>>>>>>> side-effects in my topology? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>>>>>>> basicOutputCollector) { >>>>>>>>>> >>>>>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>>>>> tuple.getValueByField("pushMessage"); >>>>>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>>>>> AsyncCompletionHandler<Response>() { >>>>>>>>>> @Override >>>>>>>>>> public Response onCompleted(Response response) throws >>>>>>>>>> Exception { >>>>>>>>>> String innerMessageId = messageId; >>>>>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>>>>> return response; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void onThrowable(Throwable t) { >>>>>>>>>> t.printStackTrace(); >>>>>>>>>> } >>>>>>>>>> }); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> thanks. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when >>>>>>>>>>> your NIO response workers handle the returned requests as >>>>>>>>>>> OutputCollector >>>>>>>>>>> is not thread safe. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>>>>>>>>> Senior Platform Engineer, FullContact >>>>>>>>>> <http://www.fullcontact.com/> >>>>>>>>>> [email protected] >>>>>>>>>> >>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt if >>>>>>>>>>> it is out of resources. So you should consider using >>>>>>>>>>> max-spout-pending for >>>>>>>>>>> spout level throttling. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Itai >>>>>>>>>>> ------------------------------ >>>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM >>>>>>>>>>> *To:* [email protected] >>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt >>>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> My bolt need to dispatch async request to remote service. >>>>>>>>>>> >>>>>>>>>>> I am using AsyncHttpReuest library( >>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which >>>>>>>>>>> based on NIO channels to get the response asynchronously while not >>>>>>>>>>> allocating Thread for each request. >>>>>>>>>>> >>>>>>>>>>> I was wondering if any side-effects could cause this >>>>>>>>>>> implementation within Storm Bolt ? >>>>>>>>>>> >>>>>>>>>>> thank you. >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >
