1) Could you give me an example of any side effects that might occur while using multiple threads in case I won’t synchronise OutputCollector?
You'll get NPEs. 2) Timeouts AsyncHttpClient has timeouts built into the client config. https://github.com/AsyncHttpClient/async-http-client/blob/036bc733ba8527d4700df048ba304f01241488ca/api/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java#L626-L636 Michael Rose (@Xorlev <https://twitter.com/xorlev>) Senior Platform Engineer, FullContact <http://www.fullcontact.com/> [email protected] On Wed, Dec 17, 2014 at 10:14 AM, Nathan Leung <[email protected]> wrote: > > 1. You will get an exception. I forgot the exact type, but a call to > OutputCollector will typically be on the stack trace. > > 2. I'm not familiar enough with AsyncHttpClient to say. I know with > Apache HttpClient you can set a timeout which puts an upper bounds on the > amount of time an HTTP request will take. Maybe you can do something > similar in AsyncHttpClient. > > On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]> > wrote: >> >> Hi, >> 1. OutputCollector access must always be synchronized if you are using >> it from multiple threads >> Could you give me an example of any side effects that might occur while >> using multiple threads in case I won’t synchronise OutputCollector? >> >> 2. I guess I didn’t formulate my question right. >> I do need the emit execution after I get any response (success or fail) >> but what with “never-got-response” case? the next bolt will never emitted >> and I will never be able to tell what happened to that request? of course I >> can manage state before execute and after execute but than it becomes ugly >> isn’t it? >> >> thanks. >> >> >> I >> >> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote: >> >> 1. Yes OutputCollector access must always be synchronized if you are >> using it from multiple threads >> >> 2. I guess it depends. If you need the web server response before the >> tuple gets sent to the next bolt, then you have to put something in your >> error handling to emit it. If you don't need the response before sending >> to the next bolt, then instead of putting emit in onCompleted you can put >> it in execute after you setup the HTTP request. >> >> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]> >> wrote: >>> >>> Hi Nathan, >>> First of all I want to thank for your responding. it's really helpful >>> and increasing my knowledge in that technology. >>> >>> If you prefer I can replay back to you via the group list just I didn't >>> want to annoy the user list. >>> >>> hope it's ok if I could keep asking few more questions about my scenario: >>> >>> >>> By your answers I understand that I should add the emit line within the >>> response clauses: >>> >>> check the following code: (I tried to make it short) >>> >>> @Override >>> public Response onCompleted(Response response) throws >>> basicOutputCollector.emit(new Values(pushMessage)); >>> >>> return response; >>> } >>> >>> @Override >>> public void onThrowable(Throwable t) { >>> basicOutputCollector.emit(new Values(pushMessage)); >>> } >>> }); >>> >>> 1. If I use the BaseBasicBolt I shouldnt care about acking but only >>> care about synchronising the basicOutputCollector >>> >>> is that Right? >>> >>> 2. How would I take care a scenario where my webservice never respond to >>> a request? how could I "timeout" and emit to my next bolt? I mean if I >>> never got a response the emit line will never be executed >>> >>> Thanks again, >>> Idan >>> >>> >>> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>: >>>> >>>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt >>>> >>>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually >>>> >>>> 3. If you use BaseBasicBolt then your tuple will be acked immediately >>>> after execute and you won't have to worry about timeouts due to HTTP >>>> response time. >>>> >>> >>> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]> >>> wrote: >>>> >>>> @Nathan: >>>> >>>> Right now the demand is to emit the tuple onFail and onComplete without >>>> retrying (as I wrote we emitting into a Persister Bolt) >>>> >>>> 1.If I am using BaseBasicBolt I still can ack manually? >>>> 2. why should I ack manually in the final response clauses while using >>>> BaseBasicBolt? >>>> >>>> >>>> * "..This means that if you don't get a response your tuple will get >>>> timed out.." >>>> >>>> >>>> 3. I didnt get the part how I can set timeout for specific bolt(so in >>>> my case if I never get response ill be able to handle this scenario without >>>> necessary retrying, maybe just logging) >>>> >>>> Thank you. >>>> >>>> >>>> >>>> >>>> >>>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>: >>>>> >>>>> it's required that you synchronize on the output collector. If you >>>>> need wider throughput through this bolt and are limited by synchronizing >>>>> on >>>>> the output collector, you will need to add more tasks. >>>>> >>>>> I would emit / ack on the completion of the HTTP request. This means >>>>> that if you don't get a response your tuple will get timed out (and if you >>>>> have reliable message handling the fail method will be called on the >>>>> spout). If you catch the error condition yourself you can always >>>>> explicitly fail the tuple using the output collector, or retry manually. >>>>> >>>>> I would reiterate that how you handle the failure states depends on >>>>> the requirements of your system. I'm assuming that you want to make use >>>>> of >>>>> storm's at least once semantics, but as mentioned if your requirements >>>>> don't demand these semantics, you can simplify your logic (and possibly >>>>> even use BaseBasicBolt as you were before). >>>>> >>>> >>>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> >>>> wrote: >>>>> >>>>> 1. I afraid that the synchronize on basicOutputCollector will harm >>>>> performances what do you think? >>>>> >>>>> 2.What do you think about emitting at onComplete of the async response >>>>> to the next bolt as shown below? That means the topology will continue to >>>>> the next bolt only after i get response from the external service(what >>>>> happen if I never gotten response.. need to solve this) >>>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote: >>>>> >>>>>> You would care if you wanted to replay the tuple tree in case of >>>>>> failure. Consider what happens if your bolt thread fails, or if the >>>>>> worker >>>>>> process fails. Will you recover properly? Is it a requirement to >>>>>> recover >>>>>> properly in this scenario? (it might not be, sometimes it's ok to lose >>>>>> data). >>>>>> >>>>>> Regarding synchronization and acking, I would ack after emit, and >>>>>> synchronize access to emit and ack only, not the entire execute method. >>>>>> Probably something like synchronized(basicOutputCollector) { ... }. >>>>>> >>>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> " It is better to manually control the acking so that it is not >>>>>>> done until the tuple is fully processed" >>>>>>> Why would I care about that? I get the response onCompleted or >>>>>>> onThrowable if anything happens ill take care for it there. >>>>>>> >>>>>>> btw: Apparently I do have another bolt after this call. it's a >>>>>>> 'Persistor' Bolt which responsible to persist the answer into a >>>>>>> datasource. >>>>>>> guess that make things even more complicated: >>>>>>> >>>>>>> Are you talking about something like this? >>>>>>> >>>>>>> @Override >>>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector >>>>>>> basicOutputCollector) { >>>>>>> >>>>>>> >>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>> tuple.getValueByField("pushMessage"); >>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>> AsyncCompletionHandler<Response>() { >>>>>>> @Override >>>>>>> public Response onCompleted(Response response) throws Exception >>>>>>> { >>>>>>> String innerMessageId = messageId; >>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>> >>>>>>> basicOutputCollector.emit(new Values(pushMessage)); >>>>>>> >>>>>>> return response; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void onThrowable(Throwable t) { >>>>>>> t.printStackTrace(); >>>>>>> } >>>>>>> }); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> check the synchronized and the basicOutputCollector.emit(new >>>>>>> Values(pushMessage)); additions >>>>>>> >>>>>>> Where would you add the acking(if needed) ? >>>>>>> thank you >>>>>>> >>>>>>> >>>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>> >>>>>>>> Sorry, I missed that. I would recommend you use IRichBolt (or >>>>>>>> BaseRichBolt). In BaseBasicBolt, you will ack once execute is >>>>>>>> finished, >>>>>>>> but this is no guarantee that your HTTP Request has actually >>>>>>>> completed. It >>>>>>>> is better to manually control the acking so that it is not done until >>>>>>>> the >>>>>>>> tuple is fully processed. This will allow you to catch scenarios >>>>>>>> where the >>>>>>>> request fails and replay accordingly, and will also allow for proper >>>>>>>> "back >>>>>>>> pressure" by preventing too many simultaneous HTTP requests. To >>>>>>>> elaborate, >>>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will >>>>>>>> indicate to >>>>>>>> the spout that you are done processing this tree (since your bolt is >>>>>>>> at the >>>>>>>> end of the topology), and the spout will be able to emit additional >>>>>>>> tuples. However, you may still be processing tuples that you have >>>>>>>> acked, >>>>>>>> since you are doing HTTP requests asynchronously. This may cause you >>>>>>>> to >>>>>>>> have more requests in flight that you anticipated. >>>>>>> >>>>>>> >>>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Nathan, >>>>>>>> >>>>>>>> excuse if I miss something here. I dont understand why I need to >>>>>>>> ack at all? I am extending from BaseBasicBolt. and we know that it's >>>>>>>> automatically provides anchoring >>>>>>>> and acking for us. >>>>>>>> >>>>>>>> So you saying although I am using BaseBasicBolt I should take care >>>>>>>> of the Acking(and synchronizing it) because I am using async responses >>>>>>>> logic? >>>>>>>> >>>>>>>> thanks, >>>>>>>> Idan. >>>>>>>> >>>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>> >>>>>>>>> OK. :) If you use a separate thread, just make sure to wrap all >>>>>>>>> accesses to the OutputCollector object with synchronized. From what >>>>>>>>> I see >>>>>>>>> it doesn't look like you use OutputCollector, but maybe, for example, >>>>>>>>> it's >>>>>>>>> best to ack the messages in your response handler. If many response >>>>>>>>> handlers can be running simultaneously, then you would need to >>>>>>>>> synchronize >>>>>>>>> the calls to ack() as they are made on the OutputCollector. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected] >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if >>>>>>>>> the synchronization in my storm bolt is concerned only to the >>>>>>>>> execute method. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>>> >>>>>>>>>> you should look up the synchronized keyword in java: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >>>>>>>>>> >>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html >>>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected] >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> Thanks for your response Itay. I understood you. >>>>>>>>>> How would you modify the above code to make sure I am >>>>>>>>>> synchronizing/make >>>>>>>>>> calls from the same Thread ? >>>>>>>>>> >>>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>: >>>>>>>>>>> >>>>>>>>>>> Idan, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does >>>>>>>>>>> not throttle traffic. It means that the last bolt would be >>>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 >>>>>>>>>>> concurrent >>>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would >>>>>>>>>>> exhaust >>>>>>>>>>> its resources. You would have been better off with throtteling. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The output collector is the object that you perform "ack" or >>>>>>>>>>> "fail" the tuple. You probably call them from a future callback. >>>>>>>>>>> Make sure >>>>>>>>>>> that all of these callbacks are called from the same thread, or are >>>>>>>>>>> synchronized. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Itai >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ------------------------------ >>>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM >>>>>>>>>>> *To:* [email protected] >>>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt >>>>>>>>>>> if it is out of resources. So you should consider using >>>>>>>>>>> max-spout-pending >>>>>>>>>>> for spout level throttling. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> @Itai, >>>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont >>>>>>>>>>> have this problem?? >>>>>>>>>>> >>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector >>>>>>>>>>> when your NIO response workers handle the returned requests as >>>>>>>>>>> OutputCollector is not thread safe. >>>>>>>>>>> >>>>>>>>>>> @Michael, >>>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue? >>>>>>>>>>> >>>>>>>>>>> This is my execution code.. that code could caouse me any >>>>>>>>>>> side-effects in my topology? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>>>>>>>> basicOutputCollector) { >>>>>>>>>>> >>>>>>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>>>>>> tuple.getValueByField("pushMessage"); >>>>>>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>>>>>> AsyncCompletionHandler<Response>() { >>>>>>>>>>> @Override >>>>>>>>>>> public Response onCompleted(Response response) throws >>>>>>>>>>> Exception { >>>>>>>>>>> String innerMessageId = messageId; >>>>>>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>>>>>> return response; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void onThrowable(Throwable t) { >>>>>>>>>>> t.printStackTrace(); >>>>>>>>>>> } >>>>>>>>>>> }); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> thanks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected] >>>>>>>>>>> >: >>>>>>>>>>>> >>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector >>>>>>>>>>>> when your NIO response workers handle the returned requests as >>>>>>>>>>>> OutputCollector is not thread safe. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>>>>>>>>>> Senior Platform Engineer, FullContact >>>>>>>>>>> <http://www.fullcontact.com/> >>>>>>>>>>> [email protected] >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt >>>>>>>>>>>> if it is out of resources. So you should consider using >>>>>>>>>>>> max-spout-pending >>>>>>>>>>>> for spout level throttling. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Itai >>>>>>>>>>>> ------------------------------ >>>>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM >>>>>>>>>>>> *To:* [email protected] >>>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt >>>>>>>>>>>> >>>>>>>>>>>> Hi All, >>>>>>>>>>>> My bolt need to dispatch async request to remote service. >>>>>>>>>>>> >>>>>>>>>>>> I am using AsyncHttpReuest library( >>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which >>>>>>>>>>>> based on NIO channels to get the response asynchronously while not >>>>>>>>>>>> allocating Thread for each request. >>>>>>>>>>>> >>>>>>>>>>>> I was wondering if any side-effects could cause this >>>>>>>>>>>> implementation within Storm Bolt ? >>>>>>>>>>>> >>>>>>>>>>>> thank you. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>
