Sorry, I missed that. I would recommend you use IRichBolt (or BaseRichBolt). In BaseBasicBolt, you will ack once execute is finished, but this is no guarantee that your HTTP Request has actually completed. It is better to manually control the acking so that it is not done until the tuple is fully processed. This will allow you to catch scenarios where the request fails and replay accordingly, and will also allow for proper "back pressure" by preventing too many simultaneous HTTP requests. To elaborate, in BaseBasicBolt, since you ack the tuple immediately, you will indicate to the spout that you are done processing this tree (since your bolt is at the end of the topology), and the spout will be able to emit additional tuples. However, you may still be processing tuples that you have acked, since you are doing HTTP requests asynchronously. This may cause you to have more requests in flight that you anticipated.
On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]> wrote: > Hi Nathan, > > excuse if I miss something here. I dont understand why I need to ack at > all? I am extending from BaseBasicBolt. and we know that it's automatically > provides anchoring > and acking for us. > > So you saying although I am using BaseBasicBolt I should take care of > the Acking(and synchronizing it) because I am using async responses logic? > > thanks, > Idan. > > 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>: >> >> OK. :) If you use a separate thread, just make sure to wrap all >> accesses to the OutputCollector object with synchronized. From what I see >> it doesn't look like you use OutputCollector, but maybe, for example, it's >> best to ack the messages in your response handler. If many response >> handlers can be running simultaneously, then you would need to synchronize >> the calls to ack() as they are made on the OutputCollector. > > > On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]> > wrote: > >> Oh I know what is all synchronized about. Just I wasn't sure if the >> synchronization >> in my storm bolt is concerned only to the execute method. >> >> >> >> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>: >>> >>> you should look up the synchronized keyword in java: >>> >>> >>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html >>> >>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html >>> >> >> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]> >> wrote: >> >>> Thanks for your response Itay. I understood you. >>> How would you modify the above code to make sure I am synchronizing/make >>> calls from the same Thread ? >>> >>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>: >>>> >>>> Idan, >>>> >>>> >>>> Consider you have 1000 concurrent tuples ... and the spout does not >>>> throttle traffic. It means that the last bolt would be >>>> handling 1000 concurrent requests. Now consider you have 100,000 concurrent >>>> tuples.... Eventually the operating system or the NIO buffer would exhaust >>>> its resources. You would have been better off with throtteling. >>>> >>>> >>>> The output collector is the object that you perform "ack" or "fail" >>>> the tuple. You probably call them from a future callback. Make sure that >>>> all of these callbacks are called from the same thread, or are >>>> synchronized. >>>> >>>> >>>> Itai >>>> >>>> >>>> ------------------------------ >>>> *From:* Idan Fridman <[email protected]> >>>> *Sent:* Tuesday, December 16, 2014 3:58 PM >>>> *To:* [email protected] >>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt >>>> >>>> >>>> Hi, >>>> >>>> Any non-blocking bolt does not push back on the previous bolt if it is >>>> out of resources. So you should consider using max-spout-pending for spout >>>> level throttling. >>>> >>>> >>>> @Itai, >>>> My async bolt is the last bolt in the chain. so i guess I dont have >>>> this problem?? >>>> >>>> Keep in mind you'll need to synchronize the OutputCollector when your >>>> NIO response workers handle the returned requests as OutputCollector is not >>>> thread safe. >>>> >>>> @Michael, >>>> I am not sure how the OutputCollector is concerned to my issue? >>>> >>>> This is my execution code.. that code could caouse me any >>>> side-effects in my topology? >>>> >>>> >>>> @Override >>>> public void execute(Tuple tuple, BasicOutputCollector >>>> basicOutputCollector) { >>>> >>>> PushMessage pushMessage = (PushMessage) >>>> tuple.getValueByField("pushMessage"); >>>> final String messageId = pushMessage.getMessageId(); >>>> asyncHttpClient.preparePost("some_url").execute(new >>>> AsyncCompletionHandler<Response>() { >>>> @Override >>>> public Response onCompleted(Response response) throws Exception { >>>> String innerMessageId = messageId; >>>> System.out.printf("\n messageId=" + innerMessageId + >>>> "responseBody=" + response.getResponseBody()); >>>> return response; >>>> } >>>> >>>> @Override >>>> public void onThrowable(Throwable t) { >>>> t.printStackTrace(); >>>> } >>>> }); >>>> } >>>> >>>> thanks. >>>> >>>> >>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>: >>>>> >>>>> Keep in mind you'll need to synchronize the OutputCollector when your >>>>> NIO response workers handle the returned requests as OutputCollector is >>>>> not >>>>> thread safe. >>>> >>>> >>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>) >>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/> >>>> [email protected] >>>> >>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]> wrote: >>>>> >>>>> Hi, >>>>> >>>>> >>>>> Any non-blocking bolt does not push back on the previous bolt if it >>>>> is out of resources. So you should consider using max-spout-pending for >>>>> spout level throttling. >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Itai >>>>> ------------------------------ >>>>> *From:* Idan Fridman <[email protected]> >>>>> *Sent:* Monday, December 15, 2014 10:19 AM >>>>> *To:* [email protected] >>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt >>>>> >>>>> Hi All, >>>>> My bolt need to dispatch async request to remote service. >>>>> >>>>> I am using AsyncHttpReuest library( >>>>> https://github.com/AsyncHttpClient/async-http-client) which based on >>>>> NIO channels to get the response asynchronously while not allocating >>>>> Thread >>>>> for each request. >>>>> >>>>> I was wondering if any side-effects could cause this implementation >>>>> within Storm Bolt ? >>>>> >>>>> thank you. >>>>> >>>> >> >
