[ 
https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228166#comment-14228166
 ] 

ASF GitHub Bot commented on STORM-329:
--------------------------------------

Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-64870378
  
    I test this on our cluster;
    ###### Before add this patch 
    ```
    2014-11-28 15:03:56 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-lg-hadoop-tst-st03.bj/10.2.201.68:49967... [48]
    2014-11-28 15:04:00 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:49967... [49]
    2014-11-28 15:04:04 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:49967... [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Closing Netty Client 
Netty-Client-A/xxx.xxx.xxx.xxx:49967
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Waiting for pending batchs to be 
sent with Netty-Client-A/xxx.xxx.xxx.xxx:49967..., timeout: 600000ms, pendings: 
0 
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does 
not take requests any more, drop the messages...
    2014-11-28 15:04:08 o.a.c.r.ExponentialBackoffRetry [WARN] maxRetries too 
large (50). Pinning to 29
    2014-11-28 15:04:08 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The 
baseSleepTimeMs [100] the maxSleepTimeMs [4000] the ma
          xRetries [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] New Netty Client, connect to B, 
46389, config: , buffer_size: 524
          2880
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-B/xxx.xxx.xxx.xxx:46389... [0]    
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] connection established to a 
remote host  Netty-Client-B/xxx.xxx.xxx.xxx:46389, [id: 0x0aa5eefe, 
/xxx.xxx.xxx.xxx:59716 =>  Netty-Client-B/xxx.xxx.xxx.xxx:46389]
    
    ```
    the log describe of the sequence of events:
    1. Worker send message to A but A had been died;
    2. Worker start connect to worker A until exceed max retry times;
    3. Meanwhile this worker send message to worker A, but send and connect are 
synchronized, send will wait until reconnect finish;
    4. Meanwhile refresh-connections in worker.clj is running, but it won't 
call worker A.close() until send finish, because it require 
endpoint-socket-lock first:
    ```
         ->>   (write-locked (:endpoint-socket-lock worker)
                    (reset! (:cached-task->node+port worker)
                            (HashMap. my-assignment)))
                  (doseq [endpoint remove-connections]
                    (.close (get @(:cached-node+port->socket worker) endpoint)))
    ```
    but right now send hold the endpoint-socket-lock:
    ```
      (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.add drainer packets)
            
            (when batch-end?
       ->>  (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket]
                  (.send drainer node+port->socket)))
              (.clear drainer))))))
    ```
    5. After reconnect failed, it call close() and change Client status to 
Closed; 
    6. Send called, but Client status is Closed, so we drop the send message.
    7. After send finished,  refresh-connections was called, it first close 
worker A(closed before, so no log print), and connect to new worker B;
    
    
    ###### After add this patch
    ```
    2014-11-28 14:22:33 b.s.m.n.Client [INFO] Reconnect started for 
Netty-Client-A/xxx.xxx.xxx.xxx:45909... [0]
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently 
is not available, dropping pending 4 messages...
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently 
is not available, dropping pending 10 messages...
    ```
    While reconnect to worker A, send message to worker A failed:
    ```
          // msgs iterator is invalid after this call, we cannot use it further
          int msgCount = iteratorSize(msgs);
    
          // the connection is down, drop pending messages
          LOG.error("The Connection channel currently is not available, 
dropping pending " + msgCount + " messages...");
    ``` 



> Add Option to Config Message handling strategy when connection timeout
> ----------------------------------------------------------------------
>
>                 Key: STORM-329
>                 URL: https://issues.apache.org/jira/browse/STORM-329
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Sean Zhong
>            Priority: Minor
>              Labels: Netty
>             Fix For: 0.9.3-rc2
>
>         Attachments: storm-329.patch, worker-kill-recover3.jpg
>
>
> This is to address a [concern brought 
> up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] 
> during the work at STORM-297:
> {quote}
> [~revans2] wrote: Your logic makes since to me on why these calls are 
> blocking. My biggest concern around the blocking is in the case of a worker 
> crashing. If a single worker crashes this can block the entire topology from 
> executing until that worker comes back up. In some cases I can see that being 
> something that you would want. In other cases I can see speed being the 
> primary concern and some users would like to get partial data fast, rather 
> then accurate data later.
> Could we make it configurable on a follow up JIRA where we can have a max 
> limit to the buffering that is allowed, before we block, or throw data away 
> (which is what zeromq does)?
> {quote}
> If some worker crash suddenly, how to handle the message which was supposed 
> to be delivered to the worker?
> 1. Should we buffer all message infinitely?
> 2. Should we block the message sending until the connection is resumed?
> 3. Should we config a buffer limit, try to buffer the message first, if the 
> limit is met, then block?
> 4. Should we neither block, nor buffer too much, but choose to drop the 
> messages, and use the built-in storm failover mechanism? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to