Hi,
1.  OutputCollector access must always be synchronized if you are using it from 
multiple threads
        
        Could you give me an example of any side effects that might occur while 
using multiple threads in case I won’t synchronise OutputCollector?

2. I guess I didn’t formulate my question right.
 I do need the emit execution after I get any response (success or fail) but 
what with “never-got-response” case? the next bolt will never emitted and I 
will never be able to tell what happened to that request? of course I can 
manage state before execute and after execute but than it becomes ugly isn’t it?

thanks.


I
> On Dec 17, 2014, at 7:02 PM, Nathan Leung <[email protected]> wrote:
> 
> 1. Yes OutputCollector access must always be synchronized if you are using it 
> from multiple threads
> 
> 2. I guess it depends.  If you need the web server response before the tuple 
> gets sent to the next bolt, then you have to put something in your error 
> handling to emit it.  If you don't need the response before sending to the 
> next bolt, then instead of putting emit in onCompleted you can put it in 
> execute after you setup the HTTP request.
> 
> On Wed, Dec 17, 2014 at 11:57 AM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Nathan,
> First of all I want to thank for your responding. it's really helpful and 
> increasing my knowledge in that technology.
> 
> If you prefer I can replay back to you via the group list just I didn't want 
> to annoy the user list.
> 
> hope it's ok if I could keep asking few more questions about my scenario:
> 
> 
> By your answers I understand that I should add the emit line within the 
> response clauses:
> 
> check the following code: (I tried to make it short)
> 
>     @Override
>         public Response onCompleted(Response response) throws                 
> basicOutputCollector.emit(new Values(pushMessage));
>              return response;
>         }
> 
>         @Override
>         public void onThrowable(Throwable t) {
>            basicOutputCollector.emit(new Values(pushMessage));
>         }
>     });
> 1. If I use the BaseBasicBolt I shouldnt care about acking but only care 
> about synchronising the basicOutputCollector 
> 
> is that Right?
> 
> 2. How would I take care a scenario where my webservice never respond to a 
> request? how could I "timeout" and emit  to my next bolt? I mean if I never 
> got a response the emit line will never be executed 
> 
> Thanks again,
> Idan
> 
> 
> 2014-12-17 18:02 GMT+02:00 Nathan Leung <[email protected] 
> <mailto:[email protected]>>:
> 1. My examples were assuming BaseRichBolt, not BaseBasicBolt
> 
> 2. If you are using BaseBasicBolt, then you wouldn't ack manually
> 
> 3. If you use BaseBasicBolt then your tuple will be acked immediately after 
> execute and you won't have to worry about timeouts due to HTTP response time.
> 
> On Wed, Dec 17, 2014 at 2:36 AM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> @Nathan:
> 
> Right now the demand is to emit the tuple onFail and onComplete without 
> retrying (as I wrote we emitting into a Persister Bolt)
> 
> 1.If I am using BaseBasicBolt I still can ack manually?
> 2. why should I ack manually in the final response clauses while using 
> BaseBasicBolt? 
> 
> * "..This means that if you don't get a response your tuple will get timed 
> out.."
> 
> 
> 3. I didnt get the part how I can set timeout for specific bolt(so in my case 
> if I never get response ill be able to handle this scenario without necessary 
> retrying, maybe just logging)
> 
> Thank you.
> 
> 
> 
> 
> 
> 2014-12-16 20:06 GMT+02:00 Nathan Leung <[email protected] 
> <mailto:[email protected]>>:
> it's required that you synchronize on the output collector.  If you need 
> wider throughput through this bolt and are limited by synchronizing on the 
> output collector, you will need to add more tasks.
> 
> I would emit / ack on the completion of the HTTP request.  This means that if 
> you don't get a response your tuple will get timed out (and if you have 
> reliable message handling the fail method will be called on the spout).  If 
> you catch the error condition yourself you can always explicitly fail the 
> tuple using the output collector, or retry manually.
> 
> I would reiterate that how you handle the failure states depends on the 
> requirements of your system.  I'm assuming that you want to make use of 
> storm's at least once semantics, but as mentioned if your requirements don't 
> demand these semantics, you can simplify your logic (and possibly even use 
> BaseBasicBolt as you were before).
> 
> On Tue, Dec 16, 2014 at 1:01 PM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> 1. I afraid that the synchronize on basicOutputCollector  will harm 
> performances what do you think?
> 
> 2.What do you think about emitting at onComplete of the async response to the 
> next bolt as shown below? That means the topology will continue to the next 
> bolt only after i get response from the external service(what happen if I 
> never gotten response.. need to solve this)
> 
> On Dec 16, 2014 7:50 PM, "Nathan Leung" <[email protected] 
> <mailto:[email protected]>> wrote:
> You would care if you wanted to replay the tuple tree in case of failure.  
> Consider what happens if your bolt thread fails, or if the worker process 
> fails.  Will you recover properly?  Is it a requirement to recover properly 
> in this scenario?  (it might not be, sometimes it's ok to lose data).
> 
> Regarding synchronization and acking, I would ack after emit, and synchronize 
> access to emit and ack only, not the entire execute method.  Probably 
> something like synchronized(basicOutputCollector) { ... }.
> 
> On Tue, Dec 16, 2014 at 12:29 PM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> " It is better to manually control the acking so that it is not done until 
> the tuple is fully processed"
> Why would I care about that? I get the response onCompleted or onThrowable if 
> anything happens ill take care for it there. 
> 
> btw: Apparently I do have another bolt after this call. it's a 'Persistor' 
> Bolt which responsible to persist the answer into a datasource. guess that 
> make things even more complicated:
> 
> Are you talking about something like this?
> 
> @Override
> public synchronized void execute(Tuple tuple, BasicOutputCollector 
> basicOutputCollector) {
> 
>     PushMessage pushMessage = (PushMessage) 
> tuple.getValueByField("pushMessage");
>     final String messageId = pushMessage.getMessageId();
>     asyncHttpClient.preparePost("some_url").execute(new 
> AsyncCompletionHandler<Response>() {
>         @Override
>         public Response onCompleted(Response response) throws Exception {
>             String innerMessageId = messageId;
>             System.out.printf("\n messageId=" + innerMessageId + 
> "responseBody=" + response.getResponseBody());
>             basicOutputCollector.emit(new Values(pushMessage));
>              return response;
>         }
> 
>         @Override
>         public void onThrowable(Throwable t) {
>             t.printStackTrace();
>         }
>     });
> }
> 
> check the synchronized and the  basicOutputCollector.emit(new 
> Values(pushMessage)); additions
> 
> Where would you add the acking(if needed) ?
> thank you
> 
> 
> 2014-12-16 19:15 GMT+02:00 Nathan Leung <[email protected] 
> <mailto:[email protected]>>:
> Sorry, I missed that.  I would recommend you use IRichBolt (or BaseRichBolt). 
>  In BaseBasicBolt, you will ack once execute is finished, but this is no 
> guarantee that your HTTP Request has actually completed.  It is better to 
> manually control the acking so that it is not done until the tuple is fully 
> processed.  This will allow you to catch scenarios where the request fails 
> and replay accordingly, and will also allow for proper "back pressure" by 
> preventing too many simultaneous HTTP requests.  To elaborate, in 
> BaseBasicBolt, since you ack the tuple immediately, you will indicate to the 
> spout that you are done processing this tree (since your bolt is at the end 
> of the topology), and the spout will be able to emit additional tuples.  
> However, you may still be processing tuples that you have acked, since you 
> are doing HTTP requests asynchronously.  This may cause you to have more 
> requests in flight that you anticipated.
> 
> On Tue, Dec 16, 2014 at 11:48 AM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Nathan,
> 
> excuse if I miss something here. I dont understand why I need to ack at all? 
> I am extending from BaseBasicBolt. and we know that it's automatically 
> provides anchoring
> and acking for us. 
> 
> So you saying although I am using BaseBasicBolt I should take care of the 
> Acking(and synchronizing it) because I am using async responses logic?
> 
> thanks,
> Idan.
> 
> 2014-12-16 18:23 GMT+02:00 Nathan Leung <[email protected] 
> <mailto:[email protected]>>:
> OK.  :)  If you use a separate thread, just make sure to wrap all accesses to 
> the OutputCollector object with synchronized.  From what I see it doesn't 
> look like you use OutputCollector, but maybe, for example, it's best to ack 
> the messages in your response handler.  If many response handlers can be 
> running simultaneously, then you would need to synchronize the calls to ack() 
> as they are made on the OutputCollector.
> 
> On Tue, Dec 16, 2014 at 11:16 AM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> Oh I know what is all synchronized about. Just I wasn't sure if the 
> synchronization in my storm bolt is concerned only to the execute method.
> 
> 
> 
> 2014-12-16 18:08 GMT+02:00 Nathan Leung <[email protected] 
> <mailto:[email protected]>>:
> you should look up the synchronized keyword in java:
> 
> http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html 
> <http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html>
> http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html 
> <http://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html>
> 
> On Tue, Dec 16, 2014 at 9:50 AM, Idan Fridman <[email protected] 
> <mailto:[email protected]>> wrote:
> Thanks for your response Itay. I understood you.
> How would you modify the above code to make sure I am synchronizing/make 
> calls from the same Thread ?
> 
> 2014-12-16 16:43 GMT+02:00 Itai Frenkel <[email protected] 
> <mailto:[email protected]>>:
> Idan,
> 
> Consider you have 1000 concurrent tuples ... and the spout does not throttle 
> traffic. It means that the last bolt would be handling 1000 concurrent 
> requests. Now consider you have 100,000 concurrent tuples.... Eventually the 
> operating system or the NIO buffer would exhaust its resources. You would 
> have been better off with throtteling.
> 
> The output collector is the object that you perform "ack" or "fail" the 
> tuple. You probably call them from a future callback. Make sure that all of 
> these callbacks are called from the same thread, or are synchronized.
> 
> Itai
> 
> From: Idan Fridman <[email protected] <mailto:[email protected]>>
> Sent: Tuesday, December 16, 2014 3:58 PM
> To: [email protected] <mailto:[email protected]>
> Subject: Re: Using AsyncHttpReuqest inside a Bolt
>  
> Hi,
> Any non-blocking bolt does not push back on the previous bolt if it is out of 
> resources. So you should consider using max-spout-pending for spout level 
> throttling.
> 
> @Itai,
> My async bolt is the last bolt in the chain. so i guess I dont have this 
> problem??
> 
> Keep in mind you'll need to synchronize the OutputCollector when your NIO 
> response workers handle the returned requests as OutputCollector is not 
> thread safe.
> 
> @Michael,
> I am not sure how the OutputCollector is concerned to my issue?
> 
> This is my execution code.. that code could caouse me any side-effects in my 
> topology?
> 
> 
> @Override
> public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
> 
>     PushMessage pushMessage = (PushMessage) 
> tuple.getValueByField("pushMessage");
>     final String messageId = pushMessage.getMessageId();
>     asyncHttpClient.preparePost("some_url").execute(new 
> AsyncCompletionHandler<Response>() {
>         @Override
>         public Response onCompleted(Response response) throws Exception {
>             String innerMessageId = messageId;
>             System.out.printf("\n messageId=" + innerMessageId + 
> "responseBody=" + response.getResponseBody());
>             return response;
>         }
> 
>         @Override
>         public void onThrowable(Throwable t) {
>             t.printStackTrace();
>         }
>     });
> }
> thanks.
> 
> 
> 2014-12-15 19:30 GMT+02:00 Michael Rose <[email protected] 
> <mailto:[email protected]>>:
> Keep in mind you'll need to synchronize the OutputCollector when your NIO 
> response workers handle the returned requests as OutputCollector is not 
> thread safe.
> 
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> [email protected] <mailto:[email protected]>
> 
> On Mon, Dec 15, 2014 at 9:20 AM, Itai Frenkel <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> 
> 
> Any non-blocking bolt does not push back on the previous bolt if it is out of 
> resources. So you should consider using max-spout-pending for spout level 
> throttling.
> 
> Regards,
> Itai
> From: Idan Fridman <[email protected] <mailto:[email protected]>>
> Sent: Monday, December 15, 2014 10:19 AM
> To: [email protected] <mailto:[email protected]>
> Subject: Using AsyncHttpReuqest inside a Bolt
>  
> Hi All,
> My bolt need to dispatch async request to remote service.
> 
>  I am using AsyncHttpReuest 
> library(https://github.com/AsyncHttpClient/async-http-client 
> <https://github.com/AsyncHttpClient/async-http-client>) which based on NIO 
> channels to get the response asynchronously while not allocating Thread for 
> each request.
> 
> I was wondering if any side-effects could cause this implementation within 
> Storm Bolt ?
> 
> thank you.
> 
> 
> 
> 

Reply via email to