[ https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228166#comment-14228166 ]
ASF GitHub Bot commented on STORM-329: -------------------------------------- Github user tedxia commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-64870378 I test this on our cluster; ###### Before add this patch ``` 2014-11-28 15:03:56 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st03.bj/10.2.201.68:49967... [48] 2014-11-28 15:04:00 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:49967... [49] 2014-11-28 15:04:04 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:49967... [50] 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-A/xxx.xxx.xxx.xxx:49967 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-A/xxx.xxx.xxx.xxx:49967..., timeout: 600000ms, pendings: 0 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages... 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages... 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages... 2014-11-28 15:04:08 o.a.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (50). Pinning to 29 2014-11-28 15:04:08 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [4000] the ma xRetries [50] 2014-11-28 15:04:08 b.s.m.n.Client [INFO] New Netty Client, connect to B, 46389, config: , buffer_size: 524 2880 2014-11-28 15:04:08 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-B/xxx.xxx.xxx.xxx:46389... [0] 2014-11-28 15:04:08 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-B/xxx.xxx.xxx.xxx:46389, [id: 0x0aa5eefe, /xxx.xxx.xxx.xxx:59716 => Netty-Client-B/xxx.xxx.xxx.xxx:46389] ``` the log describe of the sequence of events: 1. Worker send message to A but A had been died; 2. Worker start connect to worker A until exceed max retry times; 3. Meanwhile this worker send message to worker A, but send and connect are synchronized, send will wait until reconnect finish; 4. Meanwhile refresh-connections in worker.clj is running, but it won't call worker A.close() until send finish, because it require endpoint-socket-lock first: ``` ->> (write-locked (:endpoint-socket-lock worker) (reset! (:cached-task->node+port worker) (HashMap. my-assignment))) (doseq [endpoint remove-connections] (.close (get @(:cached-node+port->socket worker) endpoint))) ``` but right now send hold the endpoint-socket-lock: ``` (disruptor/clojure-handler (fn [packets _ batch-end?] (.add drainer packets) (when batch-end? ->> (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket] (.send drainer node+port->socket))) (.clear drainer)))))) ``` 5. After reconnect failed, it call close() and change Client status to Closed; 6. Send called, but Client status is Closed, so we drop the send message. 7. After send finished, refresh-connections was called, it first close worker A(closed before, so no log print), and connect to new worker B; ###### After add this patch ``` 2014-11-28 14:22:33 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:45909... [0] 2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 4 messages... 2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 10 messages... ``` While reconnect to worker A, send message to worker A failed: ``` // msgs iterator is invalid after this call, we cannot use it further int msgCount = iteratorSize(msgs); // the connection is down, drop pending messages LOG.error("The Connection channel currently is not available, dropping pending " + msgCount + " messages..."); ``` > Add Option to Config Message handling strategy when connection timeout > ---------------------------------------------------------------------- > > Key: STORM-329 > URL: https://issues.apache.org/jira/browse/STORM-329 > Project: Apache Storm > Issue Type: Improvement > Affects Versions: 0.9.2-incubating > Reporter: Sean Zhong > Priority: Minor > Labels: Netty > Fix For: 0.9.3-rc2 > > Attachments: storm-329.patch, worker-kill-recover3.jpg > > > This is to address a [concern brought > up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] > during the work at STORM-297: > {quote} > [~revans2] wrote: Your logic makes since to me on why these calls are > blocking. My biggest concern around the blocking is in the case of a worker > crashing. If a single worker crashes this can block the entire topology from > executing until that worker comes back up. In some cases I can see that being > something that you would want. In other cases I can see speed being the > primary concern and some users would like to get partial data fast, rather > then accurate data later. > Could we make it configurable on a follow up JIRA where we can have a max > limit to the buffering that is allowed, before we block, or throw data away > (which is what zeromq does)? > {quote} > If some worker crash suddenly, how to handle the message which was supposed > to be delivered to the worker? > 1. Should we buffer all message infinitely? > 2. Should we block the message sending until the connection is resumed? > 3. Should we config a buffer limit, try to buffer the message first, if the > limit is met, then block? > 4. Should we neither block, nor buffer too much, but choose to drop the > messages, and use the built-in storm failover mechanism? -- This message was sent by Atlassian JIRA (v6.3.4#6332)