" It is better to manually control the acking so that it is not done until
the tuple is fully processed"
Why would I care about that? I get the response onCompleted or onThrowable
if anything happens ill take care for it there.
btw: Apparently I do have another bolt after this call. it's a 'Persistor'
Bolt which responsible to persist the answer into a datasource. guess that
make things even more complicated:
Are you talking about something like this?
@Override
public synchronized void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {
PushMessage pushMessage = (PushMessage)
tuple.getValueByField("pushMessage");
final String messageId = pushMessage.getMessageId();
asyncHttpClient.preparePost("some_url").execute(new
AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception {
String innerMessageId = messageId;
System.out.printf("\n messageId=" + innerMessageId +
"responseBody=" + response.getResponseBody());
basicOutputCollector.emit(new Values(pushMessage));
return response;
}
@Override
public void onThrowable(Throwable t) {
t.printStackTrace();
}
});
}
check the synchronized and the basicOutputCollector.emit(new
Values(pushMessage)); additions
Where would you add the acking(if needed) ?
thank you
2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>
> Sorry, I missed that. I would recommend you use IRichBolt (or
> BaseRichBolt). In BaseBasicBolt, you will ack once execute is finished,
> but this is no guarantee that your HTTP Request has actually completed. It
> is better to manually control the acking so that it is not done until the
> tuple is fully processed. This will allow you to catch scenarios where the
> request fails and replay accordingly, and will also allow for proper "back
> pressure" by preventing too many simultaneous HTTP requests. To elaborate,
> in BaseBasicBolt, since you ack the tuple immediately, you will indicate to
> the spout that you are done processing this tree (since your bolt is at the
> end of the topology), and the spout will be able to emit additional
> tuples. However, you may still be processing tuples that you have acked,
> since you are doing HTTP requests asynchronously. This may cause you to
> have more requests in flight that you anticipated.
>
> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
> wrote:
>
>> Hi Nathan,
>>
>> excuse if I miss something here. I dont understand why I need to ack at
>> all? I am extending from BaseBasicBolt. and we know that it's automatically
>> provides anchoring
>> and acking for us.
>>
>> So you saying although I am using BaseBasicBolt I should take care of
>> the Acking(and synchronizing it) because I am using async responses logic?
>>
>> thanks,
>> Idan.
>>
>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>
>>> OK. :) If you use a separate thread, just make sure to wrap all
>>> accesses to the OutputCollector object with synchronized. From what I see
>>> it doesn't look like you use OutputCollector, but maybe, for example, it's
>>> best to ack the messages in your response handler. If many response
>>> handlers can be running simultaneously, then you would need to synchronize
>>> the calls to ack() as they are made on the OutputCollector.
>>
>>
>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]>
>> wrote:
>>
>>> Oh I know what is all synchronized about. Just I wasn't sure if the
>>> synchronization
>>> in my storm bolt is concerned only to the execute method.
>>>
>>>
>>>
>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>
>>>> you should look up the synchronized keyword in java:
>>>>
>>>>
>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>
>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>
>>>
>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>
>>>> Thanks for your response Itay. I understood you.
>>>> How would you modify the above code to make sure I am synchronizing/make
>>>> calls from the same Thread ?
>>>>
>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>
>>>>> Idan,
>>>>>
>>>>>
>>>>> Consider you have 1000 concurrent tuples ... and the spout does not
>>>>> throttle traffic. It means that the last bolt would be
>>>>> handling 1000 concurrent requests. Now consider you have 100,000
>>>>> concurrent
>>>>> tuples.... Eventually the operating system or the NIO buffer would exhaust
>>>>> its resources. You would have been better off with throtteling.
>>>>>
>>>>>
>>>>> The output collector is the object that you perform "ack" or "fail"
>>>>> the tuple. You probably call them from a future callback. Make sure that
>>>>> all of these callbacks are called from the same thread, or are
>>>>> synchronized.
>>>>>
>>>>>
>>>>> Itai
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Idan Fridman <[email protected]>
>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>> *To:* [email protected]
>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> Any non-blocking bolt does not push back on the previous bolt if it is
>>>>> out of resources. So you should consider using max-spout-pending for spout
>>>>> level throttling.
>>>>>
>>>>>
>>>>> @Itai,
>>>>> My async bolt is the last bolt in the chain. so i guess I dont have
>>>>> this problem??
>>>>>
>>>>> Keep in mind you'll need to synchronize the OutputCollector when
>>>>> your NIO response workers handle the returned requests as OutputCollector
>>>>> is not thread safe.
>>>>>
>>>>> @Michael,
>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>
>>>>> This is my execution code.. that code could caouse me any
>>>>> side-effects in my topology?
>>>>>
>>>>>
>>>>> @Override
>>>>> public void execute(Tuple tuple, BasicOutputCollector
>>>>> basicOutputCollector) {
>>>>>
>>>>> PushMessage pushMessage = (PushMessage)
>>>>> tuple.getValueByField("pushMessage");
>>>>> final String messageId = pushMessage.getMessageId();
>>>>> asyncHttpClient.preparePost("some_url").execute(new
>>>>> AsyncCompletionHandler<Response>() {
>>>>> @Override
>>>>> public Response onCompleted(Response response) throws Exception {
>>>>> String innerMessageId = messageId;
>>>>> System.out.printf("\n messageId=" + innerMessageId +
>>>>> "responseBody=" + response.getResponseBody());
>>>>> return response;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onThrowable(Throwable t) {
>>>>> t.printStackTrace();
>>>>> }
>>>>> });
>>>>> }
>>>>>
>>>>> thanks.
>>>>>
>>>>>
>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>:
>>>>>>
>>>>>> Keep in mind you'll need to synchronize the OutputCollector when your
>>>>>> NIO response workers handle the returned requests as OutputCollector is
>>>>>> not
>>>>>> thread safe.
>>>>>
>>>>>
>>>>> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>> [email protected]
>>>>>
>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> Any non-blocking bolt does not push back on the previous bolt if it
>>>>>> is out of resources. So you should consider using max-spout-pending for
>>>>>> spout level throttling.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Itai
>>>>>> ------------------------------
>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>> *To:* [email protected]
>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>
>>>>>> Hi All,
>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>
>>>>>> I am using AsyncHttpReuest library(
>>>>>> https://github.com/AsyncHttpClient/async-http-client) which based on
>>>>>> NIO channels to get the response asynchronously while not allocating
>>>>>> Thread
>>>>>> for each request.
>>>>>>
>>>>>> I was wondering if any side-effects could cause this implementation
>>>>>> within Storm Bolt ?
>>>>>>
>>>>>> thank you.
>>>>>>
>>>>>
>>>
>>
>