[
https://issues.apache.org/jira/browse/STORM-737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14540918#comment-14540918
]
ASF GitHub Bot commented on STORM-737:
--------------------------------------
Github user HeartSaVioR commented on the pull request:
https://github.com/apache/storm/pull/521#issuecomment-101446038
@d2r
First of all, I'm sorry I did some mistakes about expression.
I mean, your approach follows some of Nathan's comment, but not all.
Your patch can also resolve STORM-737, with dropping some tuples when
task->node+port is changed, while we can save these.
@nathanmarz said 861a92e made a regression, so it means, before applying
861a92e (0666c41387fc11c0422b26ab27ebc38c30fe26af) was right.
AFAIK mk-transfer-fn is looking inside task->node+port while it should be
handled from read-lock, and that's the point of regression.
You can find out I'm trying to revert it when you see changeset of
mk-transfer-tuples-handler in that commit.
Current PR takes same logic (yes, same) from old thing but playing with
TransferDrainer.
* PR
```
(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (TransferDrainer.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(.add drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(.send drainer task->node+port node+port->socket)))
(.clear drainer))))))
```
* Old (that Nathan said it was right)
https://github.com/apache/storm/blob/0666c41387fc11c0422b26ab27ebc38c30fe26af/storm-core/src/clj/backtype/storm/daemon/worker.clj
```
(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker)
drainer (ArrayList.)
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
(.addAll drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
;; consider doing some automatic batching here (would need to
not be serialized at this point to remove per-tuple overhead)
;; try using multipart messages ... first sort the tuples by
the target node (without changing the local ordering)
(fast-list-iter [[task ser-tuple] drainer]
;; TODO: consider write a batch of tuples here to every
target worker
;; group by node+port, do multipart send
(let [node-port (get task->node+port task)]
(when node-port
(.send ^IConnection (get node+port->socket node-port)
task ser-tuple))
))))
(.clear drainer))))))
```
So, I'd like you to review current PR, and find out issues, and go together.
> Workers may try to send to closed connections
> ---------------------------------------------
>
> Key: STORM-737
> URL: https://issues.apache.org/jira/browse/STORM-737
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 0.9.2-incubating
> Reporter: Derek Dagit
>
> There is a race condition in the worker code that can allow for a send() to
> be called on a closed connection.
> [Discussion|https://github.com/apache/storm/pull/349#issuecomment-87778672]
> The assignment mapping from task -> node+port needs to be read and used in
> the read lock when sending, so that an accurate mapping is used that does not
> include any connections that are closed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)