@Michael,@Natahan,@Itay, anyone else

I have modified the code by your suggestions so if anyone else also deal
with this case could know.

 Please write me your comments on it if any:


public class MyClass extends BaseRichBolt {

{

..

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
    asyncHttpClient = new AsyncHttpClient();
    outputCollector = collector;

}


@Override
public void execute(final Tuple tuple) {

    final PushMessage pushMessage = (PushMessage)
tuple.getValueByField("pushMessage");
    String template = pushMessage.getMessageBody();
    String url = "https://service_url/";;
    *synchronized **(outputCollector)* {
        asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response response) throws Exception {
                outputCollector.emit(tuple, new Values(pushMessage));
                outputCollector.ack(tuple);
                return response;
            }

            @Override
            public void onThrowable(Throwable t) {
                t.printStackTrace();
                outputCollector.emit(tuple, new Values(pushMessage));
                outputCollector.ack(tuple);
            }
        });
    }

}

Please check the synchronized  execution and let me know if I should pay
attention to anything else

thank you.


2014-12-18 11:56 GMT+02:00 Idan Fridman <[email protected]>:
>
> @Michael,
> Why do you think that synchronize will avoid NPE's? (because Storm wont
> release the OutputCollector instance until all acks will be received ?)
>
> 2014-12-17 19:34 GMT+02:00 Michael Rose <[email protected]>:
>>
>> 1) Could you give me an example of any side effects that might occur
>> while using multiple threads in case I won’t synchronise OutputCollector?
>>
>> You'll get NPEs.
>>
>> 2) Timeouts
>>
>> AsyncHttpClient has timeouts built into the client config.
>> https://github.com/AsyncHttpClient/async-http-client/blob/036bc733ba8527d4700df048ba304f01241488ca/api/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java#L626-L636
>>
>>
>>
>>
>>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> [email protected]
>
> On Wed, Dec 17, 2014 at 10:14 AM, Nathan Leung <[email protected]> wrote:
>>
>> 1. You will get an exception.  I forgot the exact type, but a call to
>> OutputCollector will typically be on the stack trace.
>>
>> 2.  I'm not familiar enough with AsyncHttpClient to say.  I know with
>> Apache HttpClient you can set a timeout which puts an upper bounds on the
>> amount of time an HTTP request will take.  Maybe you can do something
>> similar in AsyncHttpClient.
>>
>> On Wed, Dec 17, 2014 at 12:10 PM, Idan Fridman <[email protected]>
>> wrote:
>>>
>>> Hi,
>>> 1.  OutputCollector access must always be synchronized if you are using
>>> it from multiple threads
>>>  Could you give me an example of any side effects that might occur
>>> while using multiple threads in case I won’t synchronise OutputCollector?
>>>
>>> 2. I guess I didn’t formulate my question right.
>>>  I do need the emit execution after I get any response (success or fail)
>>> but what with “never-got-response” case? the next bolt will never emitted
>>> and I will never be able to tell what happened to that request? of course I
>>> can manage state before execute and after execute but than it becomes ugly
>>> isn’t it?
>>>
>>> thanks.
>>>
>>>
>>> I
>>>
>>> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote:
>>>
>>> 1. Yes OutputCollector access must always be synchronized if you are
>>> using it from multiple threads
>>>
>>> 2. I guess it depends.  If you need the web server response before the
>>> tuple gets sent to the next bolt, then you have to put something in your
>>> error handling to emit it.  If you don't need the response before sending
>>> to the next bolt, then instead of putting emit in onCompleted you can put
>>> it in execute after you setup the HTTP request.
>>>
>>> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected]>
>>> wrote:
>>>>
>>>> Hi Nathan,
>>>> First of all I want to thank for your responding. it's really helpful
>>>> and increasing my knowledge in that technology.
>>>>
>>>> If you prefer I can replay back to you via the group list just I didn't
>>>> want to annoy the user list.
>>>>
>>>> hope it's ok if I could keep asking few more questions about my
>>>> scenario:
>>>>
>>>>
>>>> By your answers I understand that I should add the emit line within the
>>>> response clauses:
>>>>
>>>> check the following code: (I tried to make it short)
>>>>
>>>>     @Override
>>>>         public Response onCompleted(Response response) throws              
>>>>    basicOutputCollector.emit(new Values(pushMessage));
>>>>
>>>>              return response;
>>>>         }
>>>>
>>>>         @Override
>>>>         public void onThrowable(Throwable t) {
>>>>            basicOutputCollector.emit(new Values(pushMessage));
>>>>         }
>>>>     });
>>>>
>>>> 1. If I use the BaseBasicBolt I shouldnt care about acking but only
>>>> care about synchronising the basicOutputCollector
>>>>
>>>> is that Right?
>>>>
>>>> 2. How would I take care a scenario where my webservice never respond
>>>> to a request? how could I "timeout" and emit  to my next bolt? I mean if I
>>>> never got a response the emit line will never be executed
>>>>
>>>> Thanks again,
>>>> Idan
>>>>
>>>>
>>>> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>
>>>>> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt
>>>>>
>>>>> 2. If you are using BaseBasicBolt, then you wouldn't ack manually
>>>>>
>>>>> 3. If you use BaseBasicBolt then your tuple will be acked immediately
>>>>> after execute and you won't have to worry about timeouts due to HTTP
>>>>> response time.
>>>>>
>>>>
>>>> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>>
>>>>> @Nathan:
>>>>>
>>>>> Right now the demand is to emit the tuple onFail and onComplete
>>>>> without retrying (as I wrote we emitting into a Persister Bolt)
>>>>>
>>>>> 1.If I am using BaseBasicBolt I still can ack manually?
>>>>> 2. why should I ack manually in the final response clauses while using
>>>>> BaseBasicBolt?
>>>>>
>>>>>
>>>>> * "..This means that if you don't get a response your tuple will get
>>>>> timed out.."
>>>>>
>>>>>
>>>>> 3. I didnt get the part how I can set timeout for specific bolt(so in
>>>>> my case if I never get response ill be able to handle this scenario 
>>>>> without
>>>>> necessary retrying, maybe just logging)
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>> it's required that you synchronize on the output collector.  If you
>>>>>> need wider throughput through this bolt and are limited by synchronizing 
>>>>>> on
>>>>>> the output collector, you will need to add more tasks.
>>>>>>
>>>>>> I would emit / ack on the completion of the HTTP request.  This means
>>>>>> that if you don't get a response your tuple will get timed out (and if 
>>>>>> you
>>>>>> have reliable message handling the fail method will be called on the
>>>>>> spout).  If you catch the error condition yourself you can always
>>>>>> explicitly fail the tuple using the output collector, or retry manually.
>>>>>>
>>>>>> I would reiterate that how you handle the failure states depends on
>>>>>> the requirements of your system.  I'm assuming that you want to make use 
>>>>>> of
>>>>>> storm's at least once semantics, but as mentioned if your requirements
>>>>>> don't demand these semantics, you can simplify your logic (and possibly
>>>>>> even use BaseBasicBolt as you were before).
>>>>>>
>>>>>
>>>>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]>
>>>>> wrote:
>>>>>>
>>>>>> 1. I afraid that the synchronize on basicOutputCollector  will harm
>>>>>> performances what do you think?
>>>>>>
>>>>>> 2.What do you think about emitting at onComplete of the async
>>>>>> response to the next bolt as shown below? That means the topology will
>>>>>> continue to the next bolt only after i get response from the external
>>>>>> service(what happen if I never gotten response.. need to solve this)
>>>>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>>>>>>
>>>>>>> You would care if you wanted to replay the tuple tree in case of
>>>>>>> failure.  Consider what happens if your bolt thread fails, or if the 
>>>>>>> worker
>>>>>>> process fails.  Will you recover properly?  Is it a requirement to 
>>>>>>> recover
>>>>>>> properly in this scenario?  (it might not be, sometimes it's ok to lose
>>>>>>> data).
>>>>>>>
>>>>>>> Regarding synchronization and acking, I would ack after emit, and
>>>>>>> synchronize access to emit and ack only, not the entire execute method.
>>>>>>> Probably something like synchronized(basicOutputCollector) { ... }.
>>>>>>>
>>>>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> " It is better to manually control the acking so that it is not
>>>>>>>> done until the tuple is fully processed"
>>>>>>>> Why would I care about that? I get the response onCompleted or
>>>>>>>> onThrowable if anything happens ill take care for it there.
>>>>>>>>
>>>>>>>> btw: Apparently I do have another bolt after this call. it's a
>>>>>>>> 'Persistor' Bolt which responsible to persist the answer into a 
>>>>>>>> datasource.
>>>>>>>> guess that make things even more complicated:
>>>>>>>>
>>>>>>>> Are you talking about something like this?
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector 
>>>>>>>> basicOutputCollector) {
>>>>>>>>
>>>>>>>>
>>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>         @Override
>>>>>>>>         public Response onCompleted(Response response) throws 
>>>>>>>> Exception {
>>>>>>>>             String innerMessageId = messageId;
>>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>
>>>>>>>>             basicOutputCollector.emit(new Values(pushMessage));
>>>>>>>>
>>>>>>>> return response;
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void onThrowable(Throwable t) {
>>>>>>>> t.printStackTrace();
>>>>>>>> }
>>>>>>>> });
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> check the synchronized and the basicOutputCollector.emit(new
>>>>>>>> Values(pushMessage)); additions
>>>>>>>>
>>>>>>>> Where would you add the acking(if needed) ?
>>>>>>>> thank you
>>>>>>>>
>>>>>>>>
>>>>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>
>>>>>>>>> Sorry, I missed that.  I would recommend you use IRichBolt (or
>>>>>>>>> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is 
>>>>>>>>> finished,
>>>>>>>>> but this is no guarantee that your HTTP Request has actually 
>>>>>>>>> completed.  It
>>>>>>>>> is better to manually control the acking so that it is not done until 
>>>>>>>>> the
>>>>>>>>> tuple is fully processed.  This will allow you to catch scenarios 
>>>>>>>>> where the
>>>>>>>>> request fails and replay accordingly, and will also allow for proper 
>>>>>>>>> "back
>>>>>>>>> pressure" by preventing too many simultaneous HTTP requests.  To 
>>>>>>>>> elaborate,
>>>>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will 
>>>>>>>>> indicate to
>>>>>>>>> the spout that you are done processing this tree (since your bolt is 
>>>>>>>>> at the
>>>>>>>>> end of the topology), and the spout will be able to emit additional
>>>>>>>>> tuples.  However, you may still be processing tuples that you have 
>>>>>>>>> acked,
>>>>>>>>> since you are doing HTTP requests asynchronously.  This may cause you 
>>>>>>>>> to
>>>>>>>>> have more requests in flight that you anticipated.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi Nathan,
>>>>>>>>>
>>>>>>>>> excuse if I miss something here. I dont understand why I need to
>>>>>>>>> ack at all? I am extending from BaseBasicBolt. and we know that it's
>>>>>>>>> automatically provides anchoring
>>>>>>>>> and acking for us.
>>>>>>>>>
>>>>>>>>> So you saying although I am using BaseBasicBolt I should take care
>>>>>>>>> of the Acking(and synchronizing it) because I am using async responses
>>>>>>>>> logic?
>>>>>>>>>
>>>>>>>>> thanks,
>>>>>>>>> Idan.
>>>>>>>>>
>>>>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>>
>>>>>>>>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>>>>>>>>> accesses to the OutputCollector object with synchronized.  From what 
>>>>>>>>>> I see
>>>>>>>>>> it doesn't look like you use OutputCollector, but maybe, for 
>>>>>>>>>> example, it's
>>>>>>>>>> best to ack the messages in your response handler.  If many response
>>>>>>>>>> handlers can be running simultaneously, then you would need to 
>>>>>>>>>> synchronize
>>>>>>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if
>>>>>>>>>> the synchronization in my storm bolt is concerned only to the
>>>>>>>>>> execute method.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>>>>
>>>>>>>>>>> you should look up the synchronized keyword in java:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>>>>>>
>>>>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your response Itay. I understood you.
>>>>>>>>>>> How would you modify the above code to make sure I am 
>>>>>>>>>>> synchronizing/make
>>>>>>>>>>> calls from the same Thread ?
>>>>>>>>>>>
>>>>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>>>>>>
>>>>>>>>>>>> Idan,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>>>>>>>>> concurrent
>>>>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would 
>>>>>>>>>>>> exhaust
>>>>>>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The output collector is the object that you perform "ack" or
>>>>>>>>>>>> "fail" the tuple. You probably call them from a future callback. 
>>>>>>>>>>>> Make sure
>>>>>>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>>>>>>> synchronized.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Itai
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  ------------------------------
>>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  Hi,
>>>>>>>>>>>>  Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>>> if it is out of resources. So you should consider using 
>>>>>>>>>>>> max-spout-pending
>>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  @Itai,
>>>>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont
>>>>>>>>>>>> have this problem??
>>>>>>>>>>>>
>>>>>>>>>>>>  Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>>
>>>>>>>>>>>>  @Michael,
>>>>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>>>>>>
>>>>>>>>>>>>  This is my execution code.. that code could caouse me any
>>>>>>>>>>>> side-effects in my topology?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  @Override
>>>>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>>>>>>>>> basicOutputCollector) {
>>>>>>>>>>>>
>>>>>>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public Response onCompleted(Response response) throws 
>>>>>>>>>>>> Exception {
>>>>>>>>>>>>             String innerMessageId = messageId;
>>>>>>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>>>>>             return response;
>>>>>>>>>>>>         }
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void onThrowable(Throwable t) {
>>>>>>>>>>>>             t.printStackTrace();
>>>>>>>>>>>>         }
>>>>>>>>>>>>     });
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>  thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <
>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector
>>>>>>>>>>>>> when your NIO response workers handle the returned requests as
>>>>>>>>>>>>> OutputCollector is not thread safe.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>>>>>>> Senior Platform Engineer, FullContact
>>>>>>>>>>>> <http://www.fullcontact.com/>
>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>>>>> if it is out of resources. So you should consider using 
>>>>>>>>>>>>> max-spout-pending
>>>>>>>>>>>>> for spout level throttling.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Itai
>>>>>>>>>>>>>  ------------------------------
>>>>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>>>>
>>>>>>>>>>>>>    Hi All,
>>>>>>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>>>>>>
>>>>>>>>>>>>>   I am using AsyncHttpReuest library(
>>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which
>>>>>>>>>>>>> based on NIO channels to get the response asynchronously while not
>>>>>>>>>>>>> allocating Thread for each request.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  I was wondering if any side-effects could cause this
>>>>>>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>  thank you.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>

Reply via email to