@Nathan: Right now the demand is to emit the tuple onFail and onComplete without retrying (as I wrote we emitting into a Persister Bolt)
1.If I am using BaseBasicBolt I still can ack manually? 2. why should I ack manually in the final response clauses while using BaseBasicBolt? * "..This means that if you don't get a response your tuple will get timed out.." 3. I didnt get the part how I can set timeout for specific bolt(so in my case if I never get response ill be able to handle this scenario without necessary retrying, maybe just logging) Thank you. 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>: > > it's required that you synchronize on the output collector. If you need > wider throughput through this bolt and are limited by synchronizing on the > output collector, you will need to add more tasks. > > I would emit / ack on the completion of the HTTP request. This means that > if you don't get a response your tuple will get timed out (and if you have > reliable message handling the fail method will be called on the spout). If > you catch the error condition yourself you can always explicitly fail the > tuple using the output collector, or retry manually. > > I would reiterate that how you handle the failure states depends on the > requirements of your system. I'm assuming that you want to make use of > storm's at least once semantics, but as mentioned if your requirements > don't demand these semantics, you can simplify your logic (and possibly > even use BaseBasicBolt as you were before). > > On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> wrote: >> >> 1. I afraid that the synchronize on basicOutputCollector will harm >> performances what do you think? >> >> 2.What do you think about emitting at onComplete of the async response to >> the next bolt as shown below? That means the topology will continue to the >> next bolt only after i get response from the external service(what happen >> if I never gotten response.. need to solve this) >> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote: >> >>> You would care if you wanted to replay the tuple tree in case of >>> failure. Consider what happens if your bolt thread fails, or if the worker >>> process fails. Will you recover properly? Is it a requirement to recover >>> properly in this scenario? (it might not be, sometimes it's ok to lose >>> data). >>> >>> Regarding synchronization and acking, I would ack after emit, and >>> synchronize access to emit and ack only, not the entire execute method. >>> Probably something like synchronized(basicOutputCollector) { ... }. >>> >>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]> >>> wrote: >>> >>>> " It is better to manually control the acking so that it is not done >>>> until the tuple is fully processed" >>>> Why would I care about that? I get the response onCompleted or >>>> onThrowable if anything happens ill take care for it there. >>>> >>>> btw: Apparently I do have another bolt after this call. it's a >>>> 'Persistor' Bolt which responsible to persist the answer into a datasource. >>>> guess that make things even more complicated: >>>> >>>> Are you talking about something like this? >>>> >>>> @Override >>>> public synchronized void execute(Tuple tuple, BasicOutputCollector >>>> basicOutputCollector) { >>>> >>>> >>>> PushMessage pushMessage = (PushMessage) >>>> tuple.getValueByField("pushMessage"); >>>> final String messageId = pushMessage.getMessageId(); >>>> asyncHttpClient.preparePost("some_url").execute(new >>>> AsyncCompletionHandler<Response>() { >>>> @Override >>>> public Response onCompleted(Response response) throws Exception { >>>> String innerMessageId = messageId; >>>> System.out.printf("\n messageId=" + innerMessageId + >>>> "responseBody=" + response.getResponseBody()); >>>> >>>> basicOutputCollector.emit(new Values(pushMessage)); >>>> >>>> return response; >>>> } >>>> >>>> @Override >>>> public void onThrowable(Throwable t) { >>>> t.printStackTrace(); >>>> } >>>> }); >>>> } >>>> >>>> >>>> check the synchronized and the basicOutputCollector.emit(new >>>> Values(pushMessage)); additions >>>> >>>> Where would you add the acking(if needed) ? >>>> thank you >>>> >>>> >>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>: >>>>> >>>>> Sorry, I missed that. I would recommend you use IRichBolt (or >>>>> BaseRichBolt). In BaseBasicBolt, you will ack once execute is finished, >>>>> but this is no guarantee that your HTTP Request has actually completed. >>>>> It >>>>> is better to manually control the acking so that it is not done until the >>>>> tuple is fully processed. This will allow you to catch scenarios where >>>>> the >>>>> request fails and replay accordingly, and will also allow for proper "back >>>>> pressure" by preventing too many simultaneous HTTP requests. To >>>>> elaborate, >>>>> in BaseBasicBolt, since you ack the tuple immediately, you will indicate >>>>> to >>>>> the spout that you are done processing this tree (since your bolt is at >>>>> the >>>>> end of the topology), and the spout will be able to emit additional >>>>> tuples. However, you may still be processing tuples that you have acked, >>>>> since you are doing HTTP requests asynchronously. This may cause you to >>>>> have more requests in flight that you anticipated. >>>> >>>> >>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]> >>>> wrote: >>>> >>>>> Hi Nathan, >>>>> >>>>> excuse if I miss something here. I dont understand why I need to ack >>>>> at all? I am extending from BaseBasicBolt. and we know that it's >>>>> automatically provides anchoring >>>>> and acking for us. >>>>> >>>>> So you saying although I am using BaseBasicBolt I should take care of >>>>> the Acking(and synchronizing it) because I am using async responses logic? >>>>> >>>>> thanks, >>>>> Idan. >>>>> >>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>: >>>>>> >>>>>> OK. :) If you use a separate thread, just make sure to wrap all >>>>>> accesses to the OutputCollector object with synchronized. From what I >>>>>> see >>>>>> it doesn't look like you use OutputCollector, but maybe, for example, >>>>>> it's >>>>>> best to ack the messages in your response handler. If many response >>>>>> handlers can be running simultaneously, then you would need to >>>>>> synchronize >>>>>> the calls to ack() as they are made on the OutputCollector. >>>>> >>>>> >>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]> >>>>> wrote: >>>>> >>>>>> Oh I know what is all synchronized about. Just I wasn't sure if the >>>>>> synchronization >>>>>> in my storm bolt is concerned only to the execute method. >>>>>> >>>>>> >>>>>> >>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>: >>>>>>> >>>>>>> you should look up the synchronized keyword in java: >>>>>>> >>>>>>> >>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >>>>>>> >>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html >>>>>>> >>>>>> >>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for your response Itay. I understood you. >>>>>>> How would you modify the above code to make sure I am synchronizing/make >>>>>>> calls from the same Thread ? >>>>>>> >>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>: >>>>>>>> >>>>>>>> Idan, >>>>>>>> >>>>>>>> >>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does >>>>>>>> not throttle traffic. It means that the last bolt would be >>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 >>>>>>>> concurrent >>>>>>>> tuples.... Eventually the operating system or the NIO buffer would >>>>>>>> exhaust >>>>>>>> its resources. You would have been better off with throtteling. >>>>>>>> >>>>>>>> >>>>>>>> The output collector is the object that you perform "ack" or >>>>>>>> "fail" the tuple. You probably call them from a future callback. Make >>>>>>>> sure >>>>>>>> that all of these callbacks are called from the same thread, or are >>>>>>>> synchronized. >>>>>>>> >>>>>>>> >>>>>>>> Itai >>>>>>>> >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM >>>>>>>> *To:* [email protected] >>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt >>>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Any non-blocking bolt does not push back on the previous bolt if it >>>>>>>> is out of resources. So you should consider using max-spout-pending for >>>>>>>> spout level throttling. >>>>>>>> >>>>>>>> >>>>>>>> @Itai, >>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont have >>>>>>>> this problem?? >>>>>>>> >>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when >>>>>>>> your NIO response workers handle the returned requests as >>>>>>>> OutputCollector >>>>>>>> is not thread safe. >>>>>>>> >>>>>>>> @Michael, >>>>>>>> I am not sure how the OutputCollector is concerned to my issue? >>>>>>>> >>>>>>>> This is my execution code.. that code could caouse me any >>>>>>>> side-effects in my topology? >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> public void execute(Tuple tuple, BasicOutputCollector >>>>>>>> basicOutputCollector) { >>>>>>>> >>>>>>>> PushMessage pushMessage = (PushMessage) >>>>>>>> tuple.getValueByField("pushMessage"); >>>>>>>> final String messageId = pushMessage.getMessageId(); >>>>>>>> asyncHttpClient.preparePost("some_url").execute(new >>>>>>>> AsyncCompletionHandler<Response>() { >>>>>>>> @Override >>>>>>>> public Response onCompleted(Response response) throws >>>>>>>> Exception { >>>>>>>> String innerMessageId = messageId; >>>>>>>> System.out.printf("\n messageId=" + innerMessageId + >>>>>>>> "responseBody=" + response.getResponseBody()); >>>>>>>> return response; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onThrowable(Throwable t) { >>>>>>>> t.printStackTrace(); >>>>>>>> } >>>>>>>> }); >>>>>>>> } >>>>>>>> >>>>>>>> thanks. >>>>>>>> >>>>>>>> >>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>: >>>>>>>>> >>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when >>>>>>>>> your NIO response workers handle the returned requests as >>>>>>>>> OutputCollector >>>>>>>>> is not thread safe. >>>>>>>> >>>>>>>> >>>>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>>>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/> >>>>>>>> [email protected] >>>>>>>> >>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> >>>>>>>>> Any non-blocking bolt does not push back on the previous bolt if >>>>>>>>> it is out of resources. So you should consider using >>>>>>>>> max-spout-pending for >>>>>>>>> spout level throttling. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Itai >>>>>>>>> ------------------------------ >>>>>>>>> *From:* Idan Fridman <[email protected]> >>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM >>>>>>>>> *To:* [email protected] >>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt >>>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> My bolt need to dispatch async request to remote service. >>>>>>>>> >>>>>>>>> I am using AsyncHttpReuest library( >>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which based >>>>>>>>> on NIO channels to get the response asynchronously while not >>>>>>>>> allocating >>>>>>>>> Thread for each request. >>>>>>>>> >>>>>>>>> I was wondering if any side-effects could cause this >>>>>>>>> implementation within Storm Bolt ? >>>>>>>>> >>>>>>>>> thank you. >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>>
