[ 
https://issues.apache.org/jira/browse/STORM-737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14540918#comment-14540918
 ] 

ASF GitHub Bot commented on STORM-737:
--------------------------------------

Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101446038
  
    @d2r 
    First of all, I'm sorry I did some mistakes about expression.
    I mean, your approach follows some of Nathan's comment, but not all.
    Your patch can also resolve STORM-737, with dropping some tuples when 
task->node+port is changed, while we can save these.
    
    @nathanmarz said 861a92e made a regression, so it means, before applying 
861a92e (0666c41387fc11c0422b26ab27ebc38c30fe26af) was right.
    AFAIK mk-transfer-fn is looking inside task->node+port while it should be 
handled from read-lock, and that's the point of regression.
    You can find out I'm trying to revert it when you see changeset of 
mk-transfer-tuples-handler in that commit.
    Current PR takes same logic (yes, same) from old thing but playing with 
TransferDrainer.
    
    * PR
    ```
    (defn mk-transfer-tuples-handler [worker]
      (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (TransferDrainer.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
        (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.add drainer packets)
            
            (when batch-end?
              (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket
                      task->node+port @task->node+port]
                  (.send drainer task->node+port node+port->socket)))
              (.clear drainer))))))
    ```
    
    * Old (that Nathan said it was right)
    
https://github.com/apache/storm/blob/0666c41387fc11c0422b26ab27ebc38c30fe26af/storm-core/src/clj/backtype/storm/daemon/worker.clj
    ```
    (defn mk-transfer-tuples-handler [worker]
      (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (ArrayList.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
        (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.addAll drainer packets)
            (when batch-end?
              (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket
                      task->node+port @task->node+port]
                  ;; consider doing some automatic batching here (would need to 
not be serialized at this point to remove per-tuple overhead)
                  ;; try using multipart messages ... first sort the tuples by 
the target node (without changing the local ordering)
                
                  (fast-list-iter [[task ser-tuple] drainer]
                    ;; TODO: consider write a batch of tuples here to every 
target worker  
                    ;; group by node+port, do multipart send              
                    (let [node-port (get task->node+port task)]
                      (when node-port
                        (.send ^IConnection (get node+port->socket node-port) 
task ser-tuple))
                        ))))
              (.clear drainer))))))
    ```
    
    So, I'd like you to review current PR, and find out issues, and go together.


> Workers may try to send to closed connections
> ---------------------------------------------
>
>                 Key: STORM-737
>                 URL: https://issues.apache.org/jira/browse/STORM-737
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.2-incubating
>            Reporter: Derek Dagit
>
> There is a race condition in the worker code that can allow for a send() to 
> be called on a closed connection.
> [Discussion|https://github.com/apache/storm/pull/349#issuecomment-87778672]
> The assignment mapping from task -> node+port needs to be read and used in 
> the read lock when sending, so that an accurate mapping is used that does not 
> include any connections that are closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to