it's required that you synchronize on the output collector.  If you need
wider throughput through this bolt and are limited by synchronizing on the
output collector, you will need to add more tasks.

I would emit / ack on the completion of the HTTP request.  This means that
if you don't get a response your tuple will get timed out (and if you have
reliable message handling the fail method will be called on the spout).  If
you catch the error condition yourself you can always explicitly fail the
tuple using the output collector, or retry manually.

I would reiterate that how you handle the failure states depends on the
requirements of your system.  I'm assuming that you want to make use of
storm's at least once semantics, but as mentioned if your requirements
don't demand these semantics, you can simplify your logic (and possibly
even use BaseBasicBolt as you were before).

On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> wrote:
>
> 1. I afraid that the synchronize on basicOutputCollector  will harm
> performances what do you think?
>
> 2.What do you think about emitting at onComplete of the async response to
> the next bolt as shown below? That means the topology will continue to the
> next bolt only after i get response from the external service(what happen
> if I never gotten response.. need to solve this)
> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>
>> You would care if you wanted to replay the tuple tree in case of
>> failure.  Consider what happens if your bolt thread fails, or if the worker
>> process fails.  Will you recover properly?  Is it a requirement to recover
>> properly in this scenario?  (it might not be, sometimes it's ok to lose
>> data).
>>
>> Regarding synchronization and acking, I would ack after emit, and
>> synchronize access to emit and ack only, not the entire execute method.
>> Probably something like synchronized(basicOutputCollector) { ... }.
>>
>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>> wrote:
>>
>>> " It is better to manually control the acking so that it is not done
>>> until the tuple is fully processed"
>>> Why would I care about that? I get the response onCompleted or
>>> onThrowable if anything happens ill take care for it there.
>>>
>>> btw: Apparently I do have another bolt after this call. it's a
>>> 'Persistor' Bolt which responsible to persist the answer into a datasource.
>>> guess that make things even more complicated:
>>>
>>> Are you talking about something like this?
>>>
>>> @Override
>>> public synchronized void execute(Tuple tuple, BasicOutputCollector 
>>> basicOutputCollector) {
>>>
>>>
>>>     PushMessage pushMessage = (PushMessage) 
>>> tuple.getValueByField("pushMessage");
>>>     final String messageId = pushMessage.getMessageId();
>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>> AsyncCompletionHandler<Response>() {
>>>         @Override
>>>         public Response onCompleted(Response response) throws Exception {
>>>             String innerMessageId = messageId;
>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>> "responseBody=" + response.getResponseBody());
>>>
>>>             basicOutputCollector.emit(new Values(pushMessage));
>>>
>>> return response;
>>> }
>>>
>>> @Override
>>> public void onThrowable(Throwable t) {
>>> t.printStackTrace();
>>> }
>>> });
>>> }
>>>
>>>
>>> check the synchronized and the basicOutputCollector.emit(new
>>> Values(pushMessage)); additions
>>>
>>> Where would you add the acking(if needed) ?
>>> thank you
>>>
>>>
>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>
>>>> Sorry, I missed that.  I would recommend you use IRichBolt (or
>>>> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is finished,
>>>> but this is no guarantee that your HTTP Request has actually completed.  It
>>>> is better to manually control the acking so that it is not done until the
>>>> tuple is fully processed.  This will allow you to catch scenarios where the
>>>> request fails and replay accordingly, and will also allow for proper "back
>>>> pressure" by preventing too many simultaneous HTTP requests.  To elaborate,
>>>> in BaseBasicBolt, since you ack the tuple immediately, you will indicate to
>>>> the spout that you are done processing this tree (since your bolt is at the
>>>> end of the topology), and the spout will be able to emit additional
>>>> tuples.  However, you may still be processing tuples that you have acked,
>>>> since you are doing HTTP requests asynchronously.  This may cause you to
>>>> have more requests in flight that you anticipated.
>>>
>>>
>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>
>>>> Hi Nathan,
>>>>
>>>> excuse if I miss something here. I dont understand why I need to ack at
>>>> all? I am extending from BaseBasicBolt. and we know that it's automatically
>>>> provides anchoring
>>>> and acking for us.
>>>>
>>>> So you saying although I am using BaseBasicBolt I should take care of
>>>> the Acking(and synchronizing it) because I am using async responses logic?
>>>>
>>>> thanks,
>>>> Idan.
>>>>
>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>
>>>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>>>> accesses to the OutputCollector object with synchronized.  From what I see
>>>>> it doesn't look like you use OutputCollector, but maybe, for example, it's
>>>>> best to ack the messages in your response handler.  If many response
>>>>> handlers can be running simultaneously, then you would need to synchronize
>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>
>>>>
>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>
>>>>> Oh I know what is all synchronized about. Just I wasn't sure if the 
>>>>> synchronization
>>>>> in my storm bolt is concerned only to the execute method.
>>>>>
>>>>>
>>>>>
>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>> you should look up the synchronized keyword in java:
>>>>>>
>>>>>>
>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>
>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>
>>>>>
>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for your response Itay. I understood you.
>>>>>> How would you modify the above code to make sure I am synchronizing/make
>>>>>> calls from the same Thread ?
>>>>>>
>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>
>>>>>>>  Idan,
>>>>>>>
>>>>>>>
>>>>>>>  Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>>>> concurrent
>>>>>>> tuples.... Eventually the operating system or the NIO buffer would 
>>>>>>> exhaust
>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>
>>>>>>>
>>>>>>>  The output collector is the object that you perform "ack" or
>>>>>>> "fail" the tuple. You probably call them from a future callback. Make 
>>>>>>> sure
>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>> synchronized.
>>>>>>>
>>>>>>>
>>>>>>>  Itai
>>>>>>>
>>>>>>>
>>>>>>>  ------------------------------
>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>> *To:* [email protected]
>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>
>>>>>>>
>>>>>>>  Hi,
>>>>>>>
>>>>>>> Any non-blocking bolt does not push back on the previous bolt if it
>>>>>>> is out of resources. So you should consider using max-spout-pending for
>>>>>>> spout level throttling.
>>>>>>>
>>>>>>>
>>>>>>>  @Itai,
>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont have
>>>>>>> this problem??
>>>>>>>
>>>>>>>  Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>> OutputCollector
>>>>>>> is not thread safe.
>>>>>>>
>>>>>>>  @Michael,
>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>
>>>>>>>  This is my execution code.. that code could caouse me any
>>>>>>> side-effects in my topology?
>>>>>>>
>>>>>>>
>>>>>>>  @Override
>>>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>>>> basicOutputCollector) {
>>>>>>>
>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>         @Override
>>>>>>>         public Response onCompleted(Response response) throws Exception 
>>>>>>> {
>>>>>>>             String innerMessageId = messageId;
>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>             return response;
>>>>>>>         }
>>>>>>>
>>>>>>>         @Override
>>>>>>>         public void onThrowable(Throwable t) {
>>>>>>>             t.printStackTrace();
>>>>>>>         }
>>>>>>>     });
>>>>>>> }
>>>>>>>
>>>>>>>  thanks.
>>>>>>>
>>>>>>>
>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>:
>>>>>>>>
>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>>> OutputCollector
>>>>>>>> is not thread safe.
>>>>>>>
>>>>>>>
>>>>>>>   Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>>>> [email protected]
>>>>>>>
>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>  Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>  Any non-blocking bolt does not push back on the previous bolt if
>>>>>>>> it is out of resources. So you should consider using max-spout-pending 
>>>>>>>> for
>>>>>>>> spout level throttling.
>>>>>>>>
>>>>>>>>
>>>>>>>>  Regards,
>>>>>>>>
>>>>>>>> Itai
>>>>>>>>  ------------------------------
>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>> *To:* [email protected]
>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>
>>>>>>>>    Hi All,
>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>
>>>>>>>>   I am using AsyncHttpReuest library(
>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which based
>>>>>>>> on NIO channels to get the response asynchronously while not allocating
>>>>>>>> Thread for each request.
>>>>>>>>
>>>>>>>>  I was wondering if any side-effects could cause this
>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>
>>>>>>>>  thank you.
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to