Hi Nathan,
First of all I want to thank for your responding. it's really helpful and
increasing my knowledge in that technology.

If you prefer I can replay back to you via the group list just I didn't
want to annoy the user list.

hope it's ok if I could keep asking few more questions about my scenario:


By your answers I understand that I should add the emit line within the
response clauses:

check the following code: (I tried to make it short)

    @Override
        public Response onCompleted(Response response) throws
       basicOutputCollector.emit(new Values(pushMessage));

             return response;
        }

        @Override
        public void onThrowable(Throwable t) {
           basicOutputCollector.emit(new Values(pushMessage));
        }
    });

1. If I use the BaseBasicBolt I shouldnt care about acking but only care
about synchronising the basicOutputCollector

is that Right?

2. How would I take care a scenario where my webservice never respond to a
request? how could I "timeout" and emit  to my next bolt? I mean if I never
got a response the emit line will never be executed

Thanks again,
Idan


2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected]>:
>
> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt
>
> 2. If you are using BaseBasicBolt, then you wouldn't ack manually
>
> 3. If you use BaseBasicBolt then your tuple will be acked immediately
> after execute and you won't have to worry about timeouts due to HTTP
> response time.
>
> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected]> wrote:
>>
>> @Nathan:
>>
>> Right now the demand is to emit the tuple onFail and onComplete without
>> retrying (as I wrote we emitting into a Persister Bolt)
>>
>> 1.If I am using BaseBasicBolt I still can ack manually?
>> 2. why should I ack manually in the final response clauses while using
>> BaseBasicBolt?
>>
>>
>> * "..This means that if you don't get a response your tuple will get
>> timed out.."
>>
>>
>> 3. I didnt get the part how I can set timeout for specific bolt(so in my
>> case if I never get response ill be able to handle this scenario without
>> necessary retrying, maybe just logging)
>>
>> Thank you.
>>
>>
>>
>>
>>
>> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected]>:
>>>
>>> it's required that you synchronize on the output collector.  If you need
>>> wider throughput through this bolt and are limited by synchronizing on the
>>> output collector, you will need to add more tasks.
>>>
>>> I would emit / ack on the completion of the HTTP request.  This means
>>> that if you don't get a response your tuple will get timed out (and if you
>>> have reliable message handling the fail method will be called on the
>>> spout).  If you catch the error condition yourself you can always
>>> explicitly fail the tuple using the output collector, or retry manually.
>>>
>>> I would reiterate that how you handle the failure states depends on the
>>> requirements of your system.  I'm assuming that you want to make use of
>>> storm's at least once semantics, but as mentioned if your requirements
>>> don't demand these semantics, you can simplify your logic (and possibly
>>> even use BaseBasicBolt as you were before).
>>>
>>
>> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected]>
>> wrote:
>>>
>>> 1. I afraid that the synchronize on basicOutputCollector  will harm
>>> performances what do you think?
>>>
>>> 2.What do you think about emitting at onComplete of the async response
>>> to the next bolt as shown below? That means the topology will continue to
>>> the next bolt only after i get response from the external service(what
>>> happen if I never gotten response.. need to solve this)
>>> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected]> wrote:
>>>
>>>> You would care if you wanted to replay the tuple tree in case of
>>>> failure.  Consider what happens if your bolt thread fails, or if the worker
>>>> process fails.  Will you recover properly?  Is it a requirement to recover
>>>> properly in this scenario?  (it might not be, sometimes it's ok to lose
>>>> data).
>>>>
>>>> Regarding synchronization and acking, I would ack after emit, and
>>>> synchronize access to emit and ack only, not the entire execute method.
>>>> Probably something like synchronized(basicOutputCollector) { ... }.
>>>>
>>>> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected]>
>>>> wrote:
>>>>
>>>>> " It is better to manually control the acking so that it is not done
>>>>> until the tuple is fully processed"
>>>>> Why would I care about that? I get the response onCompleted or
>>>>> onThrowable if anything happens ill take care for it there.
>>>>>
>>>>> btw: Apparently I do have another bolt after this call. it's a
>>>>> 'Persistor' Bolt which responsible to persist the answer into a 
>>>>> datasource.
>>>>> guess that make things even more complicated:
>>>>>
>>>>> Are you talking about something like this?
>>>>>
>>>>> @Override
>>>>> public synchronized void execute(Tuple tuple, BasicOutputCollector 
>>>>> basicOutputCollector) {
>>>>>
>>>>>
>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>> tuple.getValueByField("pushMessage");
>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>> AsyncCompletionHandler<Response>() {
>>>>>         @Override
>>>>>         public Response onCompleted(Response response) throws Exception {
>>>>>             String innerMessageId = messageId;
>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>> "responseBody=" + response.getResponseBody());
>>>>>
>>>>>             basicOutputCollector.emit(new Values(pushMessage));
>>>>>
>>>>> return response;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onThrowable(Throwable t) {
>>>>> t.printStackTrace();
>>>>> }
>>>>> });
>>>>> }
>>>>>
>>>>>
>>>>> check the synchronized and the basicOutputCollector.emit(new
>>>>> Values(pushMessage)); additions
>>>>>
>>>>> Where would you add the acking(if needed) ?
>>>>> thank you
>>>>>
>>>>>
>>>>> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>
>>>>>> Sorry, I missed that.  I would recommend you use IRichBolt (or
>>>>>> BaseRichBolt).  In BaseBasicBolt, you will ack once execute is finished,
>>>>>> but this is no guarantee that your HTTP Request has actually completed.  
>>>>>> It
>>>>>> is better to manually control the acking so that it is not done until the
>>>>>> tuple is fully processed.  This will allow you to catch scenarios where 
>>>>>> the
>>>>>> request fails and replay accordingly, and will also allow for proper 
>>>>>> "back
>>>>>> pressure" by preventing too many simultaneous HTTP requests.  To 
>>>>>> elaborate,
>>>>>> in BaseBasicBolt, since you ack the tuple immediately, you will indicate 
>>>>>> to
>>>>>> the spout that you are done processing this tree (since your bolt is at 
>>>>>> the
>>>>>> end of the topology), and the spout will be able to emit additional
>>>>>> tuples.  However, you may still be processing tuples that you have acked,
>>>>>> since you are doing HTTP requests asynchronously.  This may cause you to
>>>>>> have more requests in flight that you anticipated.
>>>>>
>>>>>
>>>>> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Nathan,
>>>>>>
>>>>>> excuse if I miss something here. I dont understand why I need to ack
>>>>>> at all? I am extending from BaseBasicBolt. and we know that it's
>>>>>> automatically provides anchoring
>>>>>> and acking for us.
>>>>>>
>>>>>> So you saying although I am using BaseBasicBolt I should take care of
>>>>>> the Acking(and synchronizing it) because I am using async responses 
>>>>>> logic?
>>>>>>
>>>>>> thanks,
>>>>>> Idan.
>>>>>>
>>>>>> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>
>>>>>>> OK.  :)  If you use a separate thread, just make sure to wrap all
>>>>>>> accesses to the OutputCollector object with synchronized.  From what I 
>>>>>>> see
>>>>>>> it doesn't look like you use OutputCollector, but maybe, for example, 
>>>>>>> it's
>>>>>>> best to ack the messages in your response handler.  If many response
>>>>>>> handlers can be running simultaneously, then you would need to 
>>>>>>> synchronize
>>>>>>> the calls to ack() as they are made on the OutputCollector.
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Oh I know what is all synchronized about. Just I wasn't sure if the 
>>>>>>> synchronization
>>>>>>> in my storm bolt is concerned only to the execute method.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected]>:
>>>>>>>>
>>>>>>>> you should look up the synchronized keyword in java:
>>>>>>>>
>>>>>>>>
>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
>>>>>>>>
>>>>>>>> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html
>>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for your response Itay. I understood you.
>>>>>>>> How would you modify the above code to make sure I am 
>>>>>>>> synchronizing/make
>>>>>>>> calls from the same Thread ?
>>>>>>>>
>>>>>>>> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected]>:
>>>>>>>>>
>>>>>>>>>  Idan,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Consider you have 1000 concurrent tuples ... and the spout does
>>>>>>>>> not throttle traffic. It means that the last bolt would be
>>>>>>>>> handling 1000 concurrent requests. Now consider you have 100,000 
>>>>>>>>> concurrent
>>>>>>>>> tuples.... Eventually the operating system or the NIO buffer would 
>>>>>>>>> exhaust
>>>>>>>>> its resources. You would have been better off with throtteling.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  The output collector is the object that you perform "ack" or
>>>>>>>>> "fail" the tuple. You probably call them from a future callback. Make 
>>>>>>>>> sure
>>>>>>>>> that all of these callbacks are called from the same thread, or are
>>>>>>>>> synchronized.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Itai
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  ------------------------------
>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>> *Sent:* Tuesday, December 16, 2014 3:58 PM
>>>>>>>>> *To:* [email protected]
>>>>>>>>> *Subject:* Re: Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Hi,
>>>>>>>>>
>>>>>>>>> Any non-blocking bolt does not push back on the previous bolt if
>>>>>>>>> it is out of resources. So you should consider using 
>>>>>>>>> max-spout-pending for
>>>>>>>>> spout level throttling.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  @Itai,
>>>>>>>>> My async bolt is the last bolt in the chain. so i guess I dont
>>>>>>>>> have this problem??
>>>>>>>>>
>>>>>>>>>  Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>>>> OutputCollector
>>>>>>>>> is not thread safe.
>>>>>>>>>
>>>>>>>>>  @Michael,
>>>>>>>>> I am not sure how the OutputCollector is concerned to my issue?
>>>>>>>>>
>>>>>>>>>  This is my execution code.. that code could caouse me any
>>>>>>>>> side-effects in my topology?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  @Override
>>>>>>>>> public void execute(Tuple tuple, BasicOutputCollector 
>>>>>>>>> basicOutputCollector) {
>>>>>>>>>
>>>>>>>>>     PushMessage pushMessage = (PushMessage) 
>>>>>>>>> tuple.getValueByField("pushMessage");
>>>>>>>>>     final String messageId = pushMessage.getMessageId();
>>>>>>>>>     asyncHttpClient.preparePost("some_url").execute(new 
>>>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>>>         @Override
>>>>>>>>>         public Response onCompleted(Response response) throws 
>>>>>>>>> Exception {
>>>>>>>>>             String innerMessageId = messageId;
>>>>>>>>>             System.out.printf("\n messageId=" + innerMessageId + 
>>>>>>>>> "responseBody=" + response.getResponseBody());
>>>>>>>>>             return response;
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>         @Override
>>>>>>>>>         public void onThrowable(Throwable t) {
>>>>>>>>>             t.printStackTrace();
>>>>>>>>>         }
>>>>>>>>>     });
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>  thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected]>:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Keep in mind you'll need to synchronize the OutputCollector when
>>>>>>>>>> your NIO response workers handle the returned requests as 
>>>>>>>>>> OutputCollector
>>>>>>>>>> is not thread safe.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   Michael Rose (@Xorlev <https://twitter.com/xorlev>)
>>>>>>>>> Senior Platform Engineer, FullContact
>>>>>>>>> <http://www.fullcontact.com/>
>>>>>>>>> [email protected]
>>>>>>>>>
>>>>>>>>> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>  Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  Any non-blocking bolt does not push back on the previous bolt
>>>>>>>>>> if it is out of resources. So you should consider using 
>>>>>>>>>> max-spout-pending
>>>>>>>>>> for spout level throttling.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  Regards,
>>>>>>>>>>
>>>>>>>>>> Itai
>>>>>>>>>>  ------------------------------
>>>>>>>>>> *From:* Idan Fridman <[email protected]>
>>>>>>>>>> *Sent:* Monday, December 15, 2014 10:19 AM
>>>>>>>>>> *To:* [email protected]
>>>>>>>>>> *Subject:* Using AsyncHttpReuqest inside a Bolt
>>>>>>>>>>
>>>>>>>>>>    Hi All,
>>>>>>>>>> My bolt need to dispatch async request to remote service.
>>>>>>>>>>
>>>>>>>>>>   I am using AsyncHttpReuest library(
>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client) which
>>>>>>>>>> based on NIO channels to get the response asynchronously while not
>>>>>>>>>> allocating Thread for each request.
>>>>>>>>>>
>>>>>>>>>>  I was wondering if any side-effects could cause this
>>>>>>>>>> implementation within Storm Bolt ?
>>>>>>>>>>
>>>>>>>>>>  thank you.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>

Reply via email to