" It is better to manually control the acking so that it is not done until
the tuple is fully processed"
Why would I care about that? I get the response onCompleted or onThrowable
if anything happens ill take care for it there.

btw: Apparently I do have another bolt after this call. it's a 'Persistor'
Bolt which responsible to persist the answer into a datasource. guess that
make things even more complicated:

Are you talking about something like this?

@Override
public synchronized void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {


    PushMessage pushMessage = (PushMessage)
tuple.getValueByField("pushMessage");
    final String messageId = pushMessage.getMessageId();
    asyncHttpClient.preparePost("some_url").execute(new
AsyncCompletionHandler<Response>() {
        @Override
        public Response onCompleted(Response response) throws Exception {
            String innerMessageId = messageId;
            System.out.printf("\n messageId=" + innerMessageId +
"responseBody=" + response.getResponseBody());

            basicOutputCollector.emit(new Values(pushMessage));

return response;
}

@Override
public void onThrowable(Throwable t) {
t.printStackTrace();
}
});
}


check the synchronized and the basicOutputCollector.emit(new
Values(pushMessage)); additions

Where would you add the acking(if needed) ?
thank you


2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>
> Sorry, I missed that.  I would recommend you use IRichBolt (or
> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is finished,
> but this is no guarantee that your HTTP Request has actually completed.  It
> is better to manually control the acking so that it is not done until the
> tuple is fully processed.  This will allow you to catch scenarios where the
> request fails and replay accordingly, and will also allow for proper "back
> pressure" by preventing too many simultaneous HTTP requests.  To elaborate,
> in BaseBasicBolt, since you ack the tuple immediately, you will indicate to
> the spout that you are done processing this tree (since your bolt is at the
> end of the topology), and the spout will be able to emit additional
> tuples.  However, you may still be processing tuples that you have acked,
> since you are doing HTTP requests asynchronously.  This may cause you to
> have more requests in flight that you anticipated.
>
> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
> wrote:
>
>> Hi Nathan,
>>
>> excuse if I miss something here. I dont understand why I need to ack at
>> all? I am extending from BaseBasicBolt. and we know that it's automatically
>> provides anchoring
>> and acking for us.
>>
>> So you saying although I am using BaseBasicBolt I should take care of
>> the Acking(and synchronizing it) because I am using async responses logic?
>>
>> thanks,
>> Idan.
>>
>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>
>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>> accesses to the OutputCollector object with synchronized.  From what I see
>>> it doesn't look like you use OutputCollector, but maybe, for example, it's
>>> best to ack the messages in your response handler.  If many response
>>> handlers can be running simultaneously, then you would need to synchronize
>>> the calls to ack() as they are made on the OutputCollector.
>>
>>
>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]>
>> wrote:
>>
>>> Oh I know what is all synchronized about. Just I wasn't sure if the 
>>> synchronization
>>> in my storm bolt is concerned only to the execute method.
>>>
>>>
>>>
>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>
>>>> you should look up the synchronized keyword in java:
>>>>
>>>>
>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>
>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>
>>>
>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>
>>>> Thanks for your response Itay. I understood you.
>>>> How would you modify the above code to make sure I am synchronizing/make
>>>> calls from the same Thread ?
>>>>
>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>
>>>>>  Idan,
>>>>>
>>>>>
>>>>>  Consider you have 1000 concurrent tuples ... and the spout does not
>>>>> throttle traffic. It means that the last bolt would be
>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>> concurrent
>>>>> tuples.... Eventually the operating system or the NIO buffer would exhaust
>>>>> its resources. You would have been better off with throtteling.
>>>>>
>>>>>
>>>>>  The output collector is the object that you perform "ack" or "fail"
>>>>> the tuple. You probably call them from a future callback. Make sure that
>>>>> all of these callbacks are called from the same thread, or are 
>>>>> synchronized.
>>>>>
>>>>>
>>>>>  Itai
>>>>>
>>>>>
>>>>>  ------------------------------
>>>>> *From:* Idan Fridman <[email protected]>
>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>> *To:* [email protected]
>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>
>>>>>
>>>>>  Hi,
>>>>>
>>>>> Any non-blocking bolt does not push back on the previous bolt if it is
>>>>> out of resources. So you should consider using max-spout-pending for spout
>>>>> level throttling.
>>>>>
>>>>>
>>>>>  @Itai,
>>>>> My async bolt is the last bolt in the chain. so i guess I dont have
>>>>> this problem??
>>>>>
>>>>>  Keep in mind you'll need to synchronize the OutputCollector when
>>>>> your NIO response workers handle the returned requests as OutputCollector
>>>>> is not thread safe.
>>>>>
>>>>>  @Michael,
>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>
>>>>>  This is my execution code.. that code could caouse me any
>>>>> side-effects in my topology?
>>>>>
>>>>>
>>>>>  @Override
>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>> basicOutputCollector) {
>>>>>
>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>> tuple.getValueByField("pushMessage");
>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>> AsyncCompletionHandler<Response>() {
>>>>>         @Override
>>>>>         public Response onCompleted(Response response) throws Exception {
>>>>>             String innerMessageId = messageId;
>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>> "responseBody=" + response.getResponseBody());
>>>>>             return response;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public void onThrowable(Throwable t) {
>>>>>             t.printStackTrace();
>>>>>         }
>>>>>     });
>>>>> }
>>>>>
>>>>>  thanks.
>>>>>
>>>>>
>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>:
>>>>>>
>>>>>> Keep in mind you'll need to synchronize the OutputCollector when your
>>>>>> NIO response workers handle the returned requests as OutputCollector is 
>>>>>> not
>>>>>> thread safe.
>>>>>
>>>>>
>>>>>   Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>> [email protected]
>>>>>
>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>>  Hi,
>>>>>>
>>>>>>
>>>>>>  Any non-blocking bolt does not push back on the previous bolt if it
>>>>>> is out of resources. So you should consider using max-spout-pending for
>>>>>> spout level throttling.
>>>>>>
>>>>>>
>>>>>>  Regards,
>>>>>>
>>>>>> Itai
>>>>>>  ------------------------------
>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>> *To:* [email protected]
>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>
>>>>>>    Hi All,
>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>
>>>>>>   I am using AsyncHttpReuest library(
>>>>>> https://github.com/AsyncHttpClient/async-http-client) which based on
>>>>>> NIO channels to get the response asynchronously while not allocating 
>>>>>> Thread
>>>>>> for each request.
>>>>>>
>>>>>>  I was wondering if any side-effects could cause this implementation
>>>>>> within Storm Bolt ?
>>>>>>
>>>>>>  thank you.
>>>>>>
>>>>>
>>>
>>
>

Reply via email to