Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/521#issuecomment-101446038 @d2r First of all, I'm sorry I did some mistakes about expression. I mean, your approach follows some of Nathan's comment, but not all. Your patch can also resolve STORM-737, with dropping some tuples when task->node+port is changed, while we can save these. @nathanmarz said 861a92e made a regression, so it means, before applying 861a92e (0666c41387fc11c0422b26ab27ebc38c30fe26af) was right. AFAIK mk-transfer-fn is looking inside task->node+port while it should be handled from read-lock, and that's the point of regression. You can find out I'm trying to revert it when you see changeset of mk-transfer-tuples-handler in that commit. Current PR takes same logic (yes, same) from old thing but playing with TransferDrainer. * PR ``` (defn mk-transfer-tuples-handler [worker] (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (TransferDrainer.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] (disruptor/clojure-handler (fn [packets _ batch-end?] (.add drainer packets) (when batch-end? (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] (.send drainer task->node+port node+port->socket))) (.clear drainer)))))) ``` * Old (that Nathan said it was right) https://github.com/apache/storm/blob/0666c41387fc11c0422b26ab27ebc38c30fe26af/storm-core/src/clj/backtype/storm/daemon/worker.clj ``` (defn mk-transfer-tuples-handler [worker] (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (ArrayList.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] (disruptor/clojure-handler (fn [packets _ batch-end?] (.addAll drainer packets) (when batch-end? (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead) ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering) (fast-list-iter [[task ser-tuple] drainer] ;; TODO: consider write a batch of tuples here to every target worker ;; group by node+port, do multipart send (let [node-port (get task->node+port task)] (when node-port (.send ^IConnection (get node+port->socket node-port) task ser-tuple)) )))) (.clear drainer)))))) ``` So, I'd like you to review current PR, and find out issues, and go together.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---