@Michael, Why do you think that synchronize will avoid NPE's? (because Storm wont release the OutputCollector instance until all acks will be received ?)
2014-12-17 19:34 GMT+02:00 Michael Rose <[email protected]>: > > 1) Could you give me an example of any side effects that might occur while > using multiple threads in case I won’t synchronise OutputCollector? > > You'll get NPEs. > > 2) Timeouts > > AsyncHttpClient has timeouts built into the client config. > https://github.com/AsyncHttpClient/async-http-client/blob/036bc733ba8527d4700df048ba304f01241488ca/api/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java#L626-L636 > > > > > > Michael Rose (@Xorlev <https://twitter.com/xorlev>) > Senior Platform Engineer, FullContact <http://www.fullcontact.com/> > [email protected] > > On Wed, Dec 17, 2014 at 10:14 AM, Nathan Leung <[email protected]> wrote: >> >> 1. You will get an exception. I forgot the exact type, but a call to >> OutputCollector will typically be on the stack trace. >> >> 2. I'm not familiar enough with AsyncHttpClient to say. I know with >> Apache HttpClient you can set a timeout which puts an upper bounds on the >> amount of time an HTTP request will take. Maybe you can do something >> similar in AsyncHttpClient. >> >> On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]> >> wrote: >>> >>> Hi, >>> 1. OutputCollector access must always be synchronized if you are using >>> it from multiple threads >>> Could you give me an example of any side effects that might occur >>> while using multiple threads in case I won’t synchronise OutputCollector? >>> >>> 2. I guess I didn’t formulate my question right. >>> I do need the emit execution after I get any response (success or fail) >>> but what with “never-got-response” case? the next bolt will never emitted >>> and I will never be able to tell what happened to that request? of course I >>> can manage state before execute and after execute but than it becomes ugly >>> isn’t it? >>> >>> thanks. >>> >>> >>> I >>> >>> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote: >>> >>> 1. Yes OutputCollector access must always be synchronized if you are >>> using it from multiple threads >>> >>> 2. I guess it depends. If you need the web server response before the >>> tuple gets sent to the next bolt, then you have to put something in your >>> error handling to emit it. If you don't need the response before sending >>> to the next bolt, then instead of putting emit in onCompleted you can put >>> it in execute after you setup the HTTP request. >>> >>> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]> >>> wrote: >>>> >>>> Hi Nathan, >>>> First of all I want to thank for your responding. it's really helpful >>>> and increasing my knowledge in that technology. >>>> >>>> If you prefer I can replay back to you via the group list just I didn't >>>> want to annoy the user list. >>>> >>>> hope it's ok if I could keep asking few more questions about my >>>> scenario: >>>> >>>> >>>> By your answers I understand that I should add the emit line within the >>>> response clauses: >>>> >>>> check the following code: (I tried to make it short) >>>> >>>> @Override >>>> public Response onCompleted(Response response) throws >>>> basicOutputCollector.emit(new Values(pushMessage)); >>>> >>>> return response; >>>> } >>>> >>>> @Override >>>> public void onThrowable(Throwable t) { >>>> basicOutputCollector.emit(new Values(pushMessage)); >>>> } >>>> }); >>>> >>>> 1. If I use the BaseBasicBolt I shouldnt care about acking but only >>>> care about synchronising the basicOutputCollector >>>> >>>> is that Right? >>>> >>>> 2. How would I take care a scenario where my webservice never respond >>>> to a request? how could I "timeout" and emit to my next bolt? I mean if I >>>> never got a response the emit line will never be executed >>>> >>>> Thanks again, >>>> Idan >>>> >>>> >>>> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>: >>>>> >>>>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt >>>>> >>>>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually >>>>> >>>>> 3. If you use BaseBasicBolt then your tuple will be acked immediately >>>>> after execute and you won't have to worry about timeouts due to HTTP >>>>> response time. >>>>> >>>> >>>> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]> >>>> wrote: >>>>> >>>>> @Nathan: >>>>> >>>>> Right now the demand is to emit the tuple onFail and onComplete >>>>> without retrying (as I wrote we emitting into a Persister Bolt) >>>>> >>>>> 1.If I am using BaseBasicBolt I still can ack manually? >>>>> 2. why should I ack manually in the final response clauses while using >>>>> BaseBasicBolt? >>>>> >>>>> >>>>> * "..This means that if you don't get a response your tuple will get >>>>> timed out.." >>>>> >>>>> >>>>> 3. I didnt get the part how I can set timeout for specific bolt(so in >>>>> my case if I never get response ill be able to handle this scenario >>>>> without >>>>> necessary retrying, maybe just logging) >>>>> >>>>> Thank you. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>: >>>>>> >>>>>> it's required that you synchronize on the output collector. If you >>>>>> need wider throughput through this bolt and are limited by synchronizing >>>>>> on >>>>>> the output collector, you will need to add more tasks. >>>>>> >>>>>> I would emit / ack on the completion of the HTTP request. This means >>>>>> that if you don't get a response your tuple will get timed out (and if >>>>>> you >>>>>> have reliable message handling the fail method will be called on the >>>>>> spout). If you catch the error condition yourself you can always >>>>>> explicitly fail the tuple using the output collector, or retry manually. >>>>>> >>>>>> I would reiterate that how you handle the failure states depends on >>>>>> the requirements of your system. I'm assuming that you want to make use >>>>>> of >>>>>> storm's at least once semantics, but as mentioned if your requirements >>>>>> don't demand these semantics, you can simplify your logic (and possibly >>>>>> even use BaseBasicBolt as you were before). >>>>>> >>>>> >>>>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> >>>>> wrote: >>>>>> >>>>>> 1. I afraid that the synchronize on basicOutputCollector will harm >>>>>> performances what do you think? >>>>>> >>>>>> 2.What do you think about emitting at onComplete of the async >>>>>> response to the next bolt as shown below? That means the topology will >>>>>> continue to the next bolt only after i get response from the external >>>>>> service(what happen if I never gotten response.. need to solve this) >>>>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote: >>>>>> >>>>>>> You would care if you wanted to replay the tuple tree in case of >>>>>>> failure. Consider what happens if your bolt thread fails, or if the >>>>>>> worker >>>>>>> process fails. Will you recover properly? Is it a requirement to >>>>>>> recover >>>>>>> properly in this scenario? (it might not be, sometimes it's ok to lose >>>>>>> data). >>>>>>> >>>>>>> Regarding synchronization and acking, I would ack after emit, and >>>>>>> synchronize access to emit and ack only, not the entire execute method. >>>>>>> Probably something like synchronized(basicOutputCollector) { ... }. >>>>>>> >>>>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> " It is better to manually control the acking so that it is not >>>>>>>> done until the tuple is fully processed" >>>>>>>> Why would I care about that? I get the response onCompleted or >>>>>>>> onThrowable if anything happens ill take care for it there. >>>>>>>> >>>>>>>> btw: Apparently I do have another bolt after this call. it's a >>>>>>>> 'Persistor' Bolt which responsible to persist the answer into a >>>>>>>> datasource. >>>>>>>> guess that make things even more complicated: >>>>>>>> >>>>>>>> Are you talking about something like this? >>>>>>>> >>>>>>>> @Override >>>>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector >>>>>>>> basicOutputCollector) { >>>>>>>> >>>>>>>> >>>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>>> tuple.getValueByField("pushMessage"); >>>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>>> AsyncCompletionHandler<Response>() { >>>>>>>> @Override >>>>>>>> public Response onCompleted(Response response) throws >>>>>>>> Exception { >>>>>>>> String innerMessageId = messageId; >>>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>>> >>>>>>>> basicOutputCollector.emit(new Values(pushMessage)); >>>>>>>> >>>>>>>> return response; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onThrowable(Throwable t) { >>>>>>>> t.printStackTrace(); >>>>>>>> } >>>>>>>> }); >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> check the synchronized and the basicOutputCollector.emit(new >>>>>>>> Values(pushMessage)); additions >>>>>>>> >>>>>>>> Where would you add the acking(if needed) ? >>>>>>>> thank you >>>>>>>> >>>>>>>> >>>>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>> >>>>>>>>> Sorry, I missed that. I would recommend you use IRichBolt (or >>>>>>>>> BaseRichBolt). In BaseBasicBolt, you will ack once execute is >>>>>>>>> finished, >>>>>>>>> but this is no guarantee that your HTTP Request has actually >>>>>>>>> completed. It >>>>>>>>> is better to manually control the acking so that it is not done until >>>>>>>>> the >>>>>>>>> tuple is fully processed. This will allow you to catch scenarios >>>>>>>>> where the >>>>>>>>> request fails and replay accordingly, and will also allow for proper >>>>>>>>> "back >>>>>>>>> pressure" by preventing too many simultaneous HTTP requests. To >>>>>>>>> elaborate, >>>>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will >>>>>>>>> indicate to >>>>>>>>> the spout that you are done processing this tree (since your bolt is >>>>>>>>> at the >>>>>>>>> end of the topology), and the spout will be able to emit additional >>>>>>>>> tuples. However, you may still be processing tuples that you have >>>>>>>>> acked, >>>>>>>>> since you are doing HTTP requests asynchronously. This may cause you >>>>>>>>> to >>>>>>>>> have more requests in flight that you anticipated. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected] >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Hi Nathan, >>>>>>>>> >>>>>>>>> excuse if I miss something here. I dont understand why I need to >>>>>>>>> ack at all? I am extending from BaseBasicBolt. and we know that it's >>>>>>>>> automatically provides anchoring >>>>>>>>> and acking for us. >>>>>>>>> >>>>>>>>> So you saying although I am using BaseBasicBolt I should take care >>>>>>>>> of the Acking(and synchronizing it) because I am using async responses >>>>>>>>> logic? >>>>>>>>> >>>>>>>>> thanks, >>>>>>>>> Idan. >>>>>>>>> >>>>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>>> >>>>>>>>>> OK. :) If you use a separate thread, just make sure to wrap all >>>>>>>>>> accesses to the OutputCollector object with synchronized. From what >>>>>>>>>> I see >>>>>>>>>> it doesn't look like you use OutputCollector, but maybe, for >>>>>>>>>> example, it's >>>>>>>>>> best to ack the messages in your response handler. If many response >>>>>>>>>> handlers can be running simultaneously, then you would need to >>>>>>>>>> synchronize >>>>>>>>>> the calls to ack() as they are made on the OutputCollector. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if >>>>>>>>>> the synchronization in my storm bolt is concerned only to the >>>>>>>>>> execute method. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>>>>>> >>>>>>>>>>> you should look up the synchronized keyword in java: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >>>>>>>>>>> >>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for your response Itay. I understood you. >>>>>>>>>>> How would you modify the above code to make sure I am >>>>>>>>>>> synchronizing/make >>>>>>>>>>> calls from the same Thread ? >>>>>>>>>>> >>>>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>: >>>>>>>>>>>> >>>>>>>>>>>> Idan, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does >>>>>>>>>>>> not throttle traffic. It means that the last bolt would be >>>>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 >>>>>>>>>>>> concurrent >>>>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would >>>>>>>>>>>> exhaust >>>>>>>>>>>> its resources. You would have been better off with throtteling. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The output collector is the object that you perform "ack" or >>>>>>>>>>>> "fail" the tuple. You probably call them from a future callback. >>>>>>>>>>>> Make sure >>>>>>>>>>>> that all of these callbacks are called from the same thread, or are >>>>>>>>>>>> synchronized. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Itai >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ------------------------------ >>>>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM >>>>>>>>>>>> *To:* [email protected] >>>>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt >>>>>>>>>>>> if it is out of resources. So you should consider using >>>>>>>>>>>> max-spout-pending >>>>>>>>>>>> for spout level throttling. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> @Itai, >>>>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont >>>>>>>>>>>> have this problem?? >>>>>>>>>>>> >>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector >>>>>>>>>>>> when your NIO response workers handle the returned requests as >>>>>>>>>>>> OutputCollector is not thread safe. >>>>>>>>>>>> >>>>>>>>>>>> @Michael, >>>>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue? >>>>>>>>>>>> >>>>>>>>>>>> This is my execution code.. that code could caouse me any >>>>>>>>>>>> side-effects in my topology? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>>>>>>>>> basicOutputCollector) { >>>>>>>>>>>> >>>>>>>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>>>>>>> tuple.getValueByField("pushMessage"); >>>>>>>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>>>>>>> AsyncCompletionHandler<Response>() { >>>>>>>>>>>> @Override >>>>>>>>>>>> public Response onCompleted(Response response) throws >>>>>>>>>>>> Exception { >>>>>>>>>>>> String innerMessageId = messageId; >>>>>>>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>>>>>>> return response; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void onThrowable(Throwable t) { >>>>>>>>>>>> t.printStackTrace(); >>>>>>>>>>>> } >>>>>>>>>>>> }); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> thanks. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose < >>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>> >>>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector >>>>>>>>>>>>> when your NIO response workers handle the returned requests as >>>>>>>>>>>>> OutputCollector is not thread safe. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>>>>>>>>>>> Senior Platform Engineer, FullContact >>>>>>>>>>>> <http://www.fullcontact.com/> >>>>>>>>>>>> [email protected] >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt >>>>>>>>>>>>> if it is out of resources. So you should consider using >>>>>>>>>>>>> max-spout-pending >>>>>>>>>>>>> for spout level throttling. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Itai >>>>>>>>>>>>> ------------------------------ >>>>>>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM >>>>>>>>>>>>> *To:* [email protected] >>>>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt >>>>>>>>>>>>> >>>>>>>>>>>>> Hi All, >>>>>>>>>>>>> My bolt need to dispatch async request to remote service. >>>>>>>>>>>>> >>>>>>>>>>>>> I am using AsyncHttpReuest library( >>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which >>>>>>>>>>>>> based on NIO channels to get the response asynchronously while not >>>>>>>>>>>>> allocating Thread for each request. >>>>>>>>>>>>> >>>>>>>>>>>>> I was wondering if any side-effects could cause this >>>>>>>>>>>>> implementation within Storm Bolt ? >>>>>>>>>>>>> >>>>>>>>>>>>> thank you. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>
