@Nathan:

Right now the demand is to emit the tuple onFail and onComplete without
retrying (as I wrote we emitting into a Persister Bolt)

1.If I am using BaseBasicBolt I still can ack manually?
2. why should I ack manually in the final response clauses while using
BaseBasicBolt?


* "..This means that if you don't get a response your tuple will get timed
out.."


3. I didnt get the part how I can set timeout for specific bolt(so in my
case if I never get response ill be able to handle this scenario without
necessary retrying, maybe just logging)

Thank you.





2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>:
>
> it's required that you synchronize on the output collector.  If you need
> wider throughput through this bolt and are limited by synchronizing on the
> output collector, you will need to add more tasks.
>
> I would emit / ack on the completion of the HTTP request.  This means that
> if you don't get a response your tuple will get timed out (and if you have
> reliable message handling the fail method will be called on the spout).  If
> you catch the error condition yourself you can always explicitly fail the
> tuple using the output collector, or retry manually.
>
> I would reiterate that how you handle the failure states depends on the
> requirements of your system.  I'm assuming that you want to make use of
> storm's at least once semantics, but as mentioned if your requirements
> don't demand these semantics, you can simplify your logic (and possibly
> even use BaseBasicBolt as you were before).
>
> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]> wrote:
>>
>> 1. I afraid that the synchronize on basicOutputCollector  will harm
>> performances what do you think?
>>
>> 2.What do you think about emitting at onComplete of the async response to
>> the next bolt as shown below? That means the topology will continue to the
>> next bolt only after i get response from the external service(what happen
>> if I never gotten response.. need to solve this)
>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>>
>>> You would care if you wanted to replay the tuple tree in case of
>>> failure.  Consider what happens if your bolt thread fails, or if the worker
>>> process fails.  Will you recover properly?  Is it a requirement to recover
>>> properly in this scenario?  (it might not be, sometimes it's ok to lose
>>> data).
>>>
>>> Regarding synchronization and acking, I would ack after emit, and
>>> synchronize access to emit and ack only, not the entire execute method.
>>> Probably something like synchronized(basicOutputCollector) { ... }.
>>>
>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>>> wrote:
>>>
>>>> " It is better to manually control the acking so that it is not done
>>>> until the tuple is fully processed"
>>>> Why would I care about that? I get the response onCompleted or
>>>> onThrowable if anything happens ill take care for it there.
>>>>
>>>> btw: Apparently I do have another bolt after this call. it's a
>>>> 'Persistor' Bolt which responsible to persist the answer into a datasource.
>>>> guess that make things even more complicated:
>>>>
>>>> Are you talking about something like this?
>>>>
>>>> @Override
>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector 
>>>> basicOutputCollector) {
>>>>
>>>>
>>>>     PushMessage pushMessage = (PushMessage) 
>>>> tuple.getValueByField("pushMessage");
>>>>     final String messageId = pushMessage.getMessageId();
>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>> AsyncCompletionHandler<Response>() {
>>>>         @Override
>>>>         public Response onCompleted(Response response) throws Exception {
>>>>             String innerMessageId = messageId;
>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>> "responseBody=" + response.getResponseBody());
>>>>
>>>>             basicOutputCollector.emit(new Values(pushMessage));
>>>>
>>>> return response;
>>>> }
>>>>
>>>> @Override
>>>> public void onThrowable(Throwable t) {
>>>> t.printStackTrace();
>>>> }
>>>> });
>>>> }
>>>>
>>>>
>>>> check the synchronized and the basicOutputCollector.emit(new
>>>> Values(pushMessage)); additions
>>>>
>>>> Where would you add the acking(if needed) ?
>>>> thank you
>>>>
>>>>
>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>
>>>>> Sorry, I missed that.  I would recommend you use IRichBolt (or
>>>>> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is finished,
>>>>> but this is no guarantee that your HTTP Request has actually completed.  
>>>>> It
>>>>> is better to manually control the acking so that it is not done until the
>>>>> tuple is fully processed.  This will allow you to catch scenarios where 
>>>>> the
>>>>> request fails and replay accordingly, and will also allow for proper "back
>>>>> pressure" by preventing too many simultaneous HTTP requests.  To 
>>>>> elaborate,
>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will indicate 
>>>>> to
>>>>> the spout that you are done processing this tree (since your bolt is at 
>>>>> the
>>>>> end of the topology), and the spout will be able to emit additional
>>>>> tuples.  However, you may still be processing tuples that you have acked,
>>>>> since you are doing HTTP requests asynchronously.  This may cause you to
>>>>> have more requests in flight that you anticipated.
>>>>
>>>>
>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Nathan,
>>>>>
>>>>> excuse if I miss something here. I dont understand why I need to ack
>>>>> at all? I am extending from BaseBasicBolt. and we know that it's
>>>>> automatically provides anchoring
>>>>> and acking for us.
>>>>>
>>>>> So you saying although I am using BaseBasicBolt I should take care of
>>>>> the Acking(and synchronizing it) because I am using async responses logic?
>>>>>
>>>>> thanks,
>>>>> Idan.
>>>>>
>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>>>>> accesses to the OutputCollector object with synchronized.  From what I 
>>>>>> see
>>>>>> it doesn't look like you use OutputCollector, but maybe, for example, 
>>>>>> it's
>>>>>> best to ack the messages in your response handler.  If many response
>>>>>> handlers can be running simultaneously, then you would need to 
>>>>>> synchronize
>>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>>
>>>>>
>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if the 
>>>>>> synchronization
>>>>>> in my storm bolt is concerned only to the execute method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>
>>>>>>> you should look up the synchronized keyword in java:
>>>>>>>
>>>>>>>
>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>>
>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>>
>>>>>>
>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for your response Itay. I understood you.
>>>>>>> How would you modify the above code to make sure I am synchronizing/make
>>>>>>> calls from the same Thread ?
>>>>>>>
>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>>
>>>>>>>>  Idan,
>>>>>>>>
>>>>>>>>
>>>>>>>>  Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>>>>> concurrent
>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would 
>>>>>>>> exhaust
>>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>>
>>>>>>>>
>>>>>>>>  The output collector is the object that you perform "ack" or
>>>>>>>> "fail" the tuple. You probably call them from a future callback. Make 
>>>>>>>> sure
>>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>>> synchronized.
>>>>>>>>
>>>>>>>>
>>>>>>>>  Itai
>>>>>>>>
>>>>>>>>
>>>>>>>>  ------------------------------
>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>>> *To:* [email protected]
>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>>
>>>>>>>>
>>>>>>>>  Hi,
>>>>>>>>
>>>>>>>> Any non-blocking bolt does not push back on the previous bolt if it
>>>>>>>> is out of resources. So you should consider using max-spout-pending for
>>>>>>>> spout level throttling.
>>>>>>>>
>>>>>>>>
>>>>>>>>  @Itai,
>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont have
>>>>>>>> this problem??
>>>>>>>>
>>>>>>>>  Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>>> OutputCollector
>>>>>>>> is not thread safe.
>>>>>>>>
>>>>>>>>  @Michael,
>>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>>
>>>>>>>>  This is my execution code.. that code could caouse me any
>>>>>>>> side-effects in my topology?
>>>>>>>>
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>>>>> basicOutputCollector) {
>>>>>>>>
>>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>         @Override
>>>>>>>>         public Response onCompleted(Response response) throws 
>>>>>>>> Exception {
>>>>>>>>             String innerMessageId = messageId;
>>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>             return response;
>>>>>>>>         }
>>>>>>>>
>>>>>>>>         @Override
>>>>>>>>         public void onThrowable(Throwable t) {
>>>>>>>>             t.printStackTrace();
>>>>>>>>         }
>>>>>>>>     });
>>>>>>>> }
>>>>>>>>
>>>>>>>>  thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>:
>>>>>>>>>
>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>>>> OutputCollector
>>>>>>>>> is not thread safe.
>>>>>>>>
>>>>>>>>
>>>>>>>>   Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>>> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
>>>>>>>> [email protected]
>>>>>>>>
>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>  Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Any non-blocking bolt does not push back on the previous bolt if
>>>>>>>>> it is out of resources. So you should consider using 
>>>>>>>>> max-spout-pending for
>>>>>>>>> spout level throttling.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Regards,
>>>>>>>>>
>>>>>>>>> Itai
>>>>>>>>>  ------------------------------
>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>>> *To:* [email protected]
>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>
>>>>>>>>>    Hi All,
>>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>>
>>>>>>>>>   I am using AsyncHttpReuest library(
>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which based
>>>>>>>>> on NIO channels to get the response asynchronously while not 
>>>>>>>>> allocating
>>>>>>>>> Thread for each request.
>>>>>>>>>
>>>>>>>>>  I was wondering if any side-effects could cause this
>>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>>
>>>>>>>>>  thank you.
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to