Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2319#discussion_r138099459
--- Diff: storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---
@@ -961,14 +1015,25 @@
;; tasks figure out what tasks to talk to by looking at topology at
runtime
;; only log/set when there's been a change to the assignment
(doseq [[topology-id assignment] new-assignments
- :let [existing-assignment (get existing-assignments
topology-id)
- topology-details (.getById topologies topology-id)]]
+ :let [existing-assignment (get existing-assignments
topology-id)]]
(if (= existing-assignment assignment)
(log-debug "Assignment for " topology-id " hasn't changed")
(do
(log-message "Setting new assignment for topology id "
topology-id ": " (pr-str assignment))
(.set-assignment! storm-cluster-state topology-id assignment)
)))
+
+ ;; grouping assignment by node to see the nodes diff, then notify
nodes/supervisors to synchronize its owned assignment
+ ;; because the number of existing assignments is small for every
scheduling round,
+ ;; we expect to notify supervisors at almost the same time.
+ (->> new-assignments
+ (map (fn [[tid new-assignment]]
+ (let [existing-assignment (get existing-assignments tid)]
+ (assignment-changed-nodes existing-assignment new-assignment
))))
+ (apply concat)
+ (into #{})
+ (notify-supervisors-assignments conf new-assignments))
--- End diff --
So then if we can put in a thread pool with a timeout of 5 seconds or so
for each push attempt we should be able to get the best of both worlds.
---