1) Could you give me an example of any side effects that might occur while
using multiple threads in case I won’t synchronise OutputCollector?

You'll get NPEs.

2) Timeouts

AsyncHttpClient has timeouts built into the client config.
https://github.com/AsyncHttpClient/async-http-client/blob/036bc733ba8527d4700df048ba304f01241488ca/api/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java#L626-L636





Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
[email protected]

On Wed, Dec 17, 2014 at 10:14 AM, Nathan Leung <[email protected]> wrote:
>
> 1. You will get an exception.  I forgot the exact type, but a call to
> OutputCollector will typically be on the stack trace.
>
> 2.  I'm not familiar enough with AsyncHttpClient to say.  I know with
> Apache HttpClient you can set a timeout which puts an upper bounds on the
> amount of time an HTTP request will take.  Maybe you can do something
> similar in AsyncHttpClient.
>
> On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]>
> wrote:
>>
>> Hi,
>> 1.  OutputCollector access must always be synchronized if you are using
>> it from multiple threads
>>  Could you give me an example of any side effects that might occur while
>> using multiple threads in case I won’t synchronise OutputCollector?
>>
>> 2. I guess I didn’t formulate my question right.
>>  I do need the emit execution after I get any response (success or fail)
>> but what with “never-got-response” case? the next bolt will never emitted
>> and I will never be able to tell what happened to that request? of course I
>> can manage state before execute and after execute but than it becomes ugly
>> isn’t it?
>>
>> thanks.
>>
>>
>> I
>>
>> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote:
>>
>> 1. Yes OutputCollector access must always be synchronized if you are
>> using it from multiple threads
>>
>> 2. I guess it depends.  If you need the web server response before the
>> tuple gets sent to the next bolt, then you have to put something in your
>> error handling to emit it.  If you don't need the response before sending
>> to the next bolt, then instead of putting emit in onCompleted you can put
>> it in execute after you setup the HTTP request.
>>
>> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]>
>> wrote:
>>>
>>> Hi Nathan,
>>> First of all I want to thank for your responding. it's really helpful
>>> and increasing my knowledge in that technology.
>>>
>>> If you prefer I can replay back to you via the group list just I didn't
>>> want to annoy the user list.
>>>
>>> hope it's ok if I could keep asking few more questions about my scenario:
>>>
>>>
>>> By your answers I understand that I should add the emit line within the
>>> response clauses:
>>>
>>> check the following code: (I tried to make it short)
>>>
>>>     @Override
>>>         public Response onCompleted(Response response) throws               
>>>   basicOutputCollector.emit(new Values(pushMessage));
>>>
>>>              return response;
>>>         }
>>>
>>>         @Override
>>>         public void onThrowable(Throwable t) {
>>>            basicOutputCollector.emit(new Values(pushMessage));
>>>         }
>>>     });
>>>
>>> 1. If I use the BaseBasicBolt I shouldnt care about acking but only
>>> care about synchronising the basicOutputCollector
>>>
>>> is that Right?
>>>
>>> 2. How would I take care a scenario where my webservice never respond to
>>> a request? how could I "timeout" and emit  to my next bolt? I mean if I
>>> never got a response the emit line will never be executed
>>>
>>> Thanks again,
>>> Idan
>>>
>>>
>>> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>:
>>>>
>>>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt
>>>>
>>>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually
>>>>
>>>> 3. If you use BaseBasicBolt then your tuple will be acked immediately
>>>> after execute and you won't have to worry about timeouts due to HTTP
>>>> response time.
>>>>
>>>
>>> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>>
>>>> @Nathan:
>>>>
>>>> Right now the demand is to emit the tuple onFail and onComplete without
>>>> retrying (as I wrote we emitting into a Persister Bolt)
>>>>
>>>> 1.If I am using BaseBasicBolt I still can ack manually?
>>>> 2. why should I ack manually in the final response clauses while using
>>>> BaseBasicBolt?
>>>>
>>>>
>>>> * "..This means that if you don't get a response your tuple will get
>>>> timed out.."
>>>>
>>>>
>>>> 3. I didnt get the part how I can set timeout for specific bolt(so in
>>>> my case if I never get response ill be able to handle this scenario without
>>>> necessary retrying, maybe just logging)
>>>>
>>>> Thank you.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>
>>>>> it's required that you synchronize on the output collector.  If you
>>>>> need wider throughput through this bolt and are limited by synchronizing 
>>>>> on
>>>>> the output collector, you will need to add more tasks.
>>>>>
>>>>> I would emit / ack on the completion of the HTTP request.  This means
>>>>> that if you don't get a response your tuple will get timed out (and if you
>>>>> have reliable message handling the fail method will be called on the
>>>>> spout).  If you catch the error condition yourself you can always
>>>>> explicitly fail the tuple using the output collector, or retry manually.
>>>>>
>>>>> I would reiterate that how you handle the failure states depends on
>>>>> the requirements of your system.  I'm assuming that you want to make use 
>>>>> of
>>>>> storm's at least once semantics, but as mentioned if your requirements
>>>>> don't demand these semantics, you can simplify your logic (and possibly
>>>>> even use BaseBasicBolt as you were before).
>>>>>
>>>>
>>>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>>
>>>>> 1. I afraid that the synchronize on basicOutputCollector  will harm
>>>>> performances what do you think?
>>>>>
>>>>> 2.What do you think about emitting at onComplete of the async response
>>>>> to the next bolt as shown below? That means the topology will continue to
>>>>> the next bolt only after i get response from the external service(what
>>>>> happen if I never gotten response.. need to solve this)
>>>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>
>>>>>> You would care if you wanted to replay the tuple tree in case of
>>>>>> failure.  Consider what happens if your bolt thread fails, or if the 
>>>>>> worker
>>>>>> process fails.  Will you recover properly?  Is it a requirement to 
>>>>>> recover
>>>>>> properly in this scenario?  (it might not be, sometimes it's ok to lose
>>>>>> data).
>>>>>>
>>>>>> Regarding synchronization and acking, I would ack after emit, and
>>>>>> synchronize access to emit and ack only, not the entire execute method.
>>>>>> Probably something like synchronized(basicOutputCollector) { ... }.
>>>>>>
>>>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> " It is better to manually control the acking so that it is not
>>>>>>> done until the tuple is fully processed"
>>>>>>> Why would I care about that? I get the response onCompleted or
>>>>>>> onThrowable if anything happens ill take care for it there.
>>>>>>>
>>>>>>> btw: Apparently I do have another bolt after this call. it's a
>>>>>>> 'Persistor' Bolt which responsible to persist the answer into a 
>>>>>>> datasource.
>>>>>>> guess that make things even more complicated:
>>>>>>>
>>>>>>> Are you talking about something like this?
>>>>>>>
>>>>>>> @Override
>>>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector 
>>>>>>> basicOutputCollector) {
>>>>>>>
>>>>>>>
>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>         @Override
>>>>>>>         public Response onCompleted(Response response) throws Exception 
>>>>>>> {
>>>>>>>             String innerMessageId = messageId;
>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>
>>>>>>>             basicOutputCollector.emit(new Values(pushMessage));
>>>>>>>
>>>>>>> return response;
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public void onThrowable(Throwable t) {
>>>>>>> t.printStackTrace();
>>>>>>> }
>>>>>>> });
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> check the synchronized and the basicOutputCollector.emit(new
>>>>>>> Values(pushMessage)); additions
>>>>>>>
>>>>>>> Where would you add the acking(if needed) ?
>>>>>>> thank you
>>>>>>>
>>>>>>>
>>>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>
>>>>>>>> Sorry, I missed that.  I would recommend you use IRichBolt (or
>>>>>>>> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is 
>>>>>>>> finished,
>>>>>>>> but this is no guarantee that your HTTP Request has actually 
>>>>>>>> completed.  It
>>>>>>>> is better to manually control the acking so that it is not done until 
>>>>>>>> the
>>>>>>>> tuple is fully processed.  This will allow you to catch scenarios 
>>>>>>>> where the
>>>>>>>> request fails and replay accordingly, and will also allow for proper 
>>>>>>>> "back
>>>>>>>> pressure" by preventing too many simultaneous HTTP requests.  To 
>>>>>>>> elaborate,
>>>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will 
>>>>>>>> indicate to
>>>>>>>> the spout that you are done processing this tree (since your bolt is 
>>>>>>>> at the
>>>>>>>> end of the topology), and the spout will be able to emit additional
>>>>>>>> tuples.  However, you may still be processing tuples that you have 
>>>>>>>> acked,
>>>>>>>> since you are doing HTTP requests asynchronously.  This may cause you 
>>>>>>>> to
>>>>>>>> have more requests in flight that you anticipated.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Nathan,
>>>>>>>>
>>>>>>>> excuse if I miss something here. I dont understand why I need to
>>>>>>>> ack at all? I am extending from BaseBasicBolt. and we know that it's
>>>>>>>> automatically provides anchoring
>>>>>>>> and acking for us.
>>>>>>>>
>>>>>>>> So you saying although I am using BaseBasicBolt I should take care
>>>>>>>> of the Acking(and synchronizing it) because I am using async responses
>>>>>>>> logic?
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>> Idan.
>>>>>>>>
>>>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>
>>>>>>>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>>>>>>>> accesses to the OutputCollector object with synchronized.  From what 
>>>>>>>>> I see
>>>>>>>>> it doesn't look like you use OutputCollector, but maybe, for example, 
>>>>>>>>> it's
>>>>>>>>> best to ack the messages in your response handler.  If many response
>>>>>>>>> handlers can be running simultaneously, then you would need to 
>>>>>>>>> synchronize
>>>>>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if
>>>>>>>>> the synchronization in my storm bolt is concerned only to the
>>>>>>>>> execute method.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>>
>>>>>>>>>> you should look up the synchronized keyword in java:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>>>>>
>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for your response Itay. I understood you.
>>>>>>>>>> How would you modify the above code to make sure I am 
>>>>>>>>>> synchronizing/make
>>>>>>>>>> calls from the same Thread ?
>>>>>>>>>>
>>>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>>>>>
>>>>>>>>>>> Idan,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>>>>>>>> concurrent
>>>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would 
>>>>>>>>>>> exhaust
>>>>>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The output collector is the object that you perform "ack" or
>>>>>>>>>>> "fail" the tuple. You probably call them from a future callback. 
>>>>>>>>>>> Make sure
>>>>>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>>>>>> synchronized.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Itai
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  ------------------------------
>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Hi,
>>>>>>>>>>>  Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>> if it is out of resources. So you should consider using 
>>>>>>>>>>> max-spout-pending
>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  @Itai,
>>>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont
>>>>>>>>>>> have this problem??
>>>>>>>>>>>
>>>>>>>>>>>  Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>
>>>>>>>>>>>  @Michael,
>>>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>>>>>
>>>>>>>>>>>  This is my execution code.. that code could caouse me any
>>>>>>>>>>> side-effects in my topology?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  @Override
>>>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>>>>>>>> basicOutputCollector) {
>>>>>>>>>>>
>>>>>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>>>>         @Override
>>>>>>>>>>>         public Response onCompleted(Response response) throws 
>>>>>>>>>>> Exception {
>>>>>>>>>>>             String innerMessageId = messageId;
>>>>>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>>>>             return response;
>>>>>>>>>>>         }
>>>>>>>>>>>
>>>>>>>>>>>         @Override
>>>>>>>>>>>         public void onThrowable(Throwable t) {
>>>>>>>>>>>             t.printStackTrace();
>>>>>>>>>>>         }
>>>>>>>>>>>     });
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>  thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]
>>>>>>>>>>> >:
>>>>>>>>>>>>
>>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>>>>>> Senior Platform Engineer, FullContact
>>>>>>>>>>> <http://www.fullcontact.com/>
>>>>>>>>>>> [email protected]
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>  Hi,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>>> if it is out of resources. So you should consider using 
>>>>>>>>>>>> max-spout-pending
>>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Itai
>>>>>>>>>>>>  ------------------------------
>>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>>
>>>>>>>>>>>>    Hi All,
>>>>>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>>>>>
>>>>>>>>>>>>   I am using AsyncHttpReuest library(
>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which
>>>>>>>>>>>> based on NIO channels to get the response asynchronously while not
>>>>>>>>>>>> allocating Thread for each request.
>>>>>>>>>>>>
>>>>>>>>>>>>  I was wondering if any side-effects could cause this
>>>>>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>>>>>
>>>>>>>>>>>>  thank you.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>

Reply via email to