Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/845#discussion_r45133203
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
    @@ -372,66 +389,50 @@
              ". State: " state
              ", Heartbeat: " (pr-str heartbeat))
             (shutdown-worker supervisor id)
    -        (if (:code-distributor supervisor)
    -          (.cleanup (:code-distributor supervisor) id))
    -        ))
    -
    -    (doseq [id (vals new-worker-ids)]
    -      (local-mkdirs (worker-pids-root conf id))
    -      (local-mkdirs (worker-heartbeats-root conf id)))
    -    (ls-approved-workers! local-state
    -          (merge
    -           (select-keys (ls-approved-workers local-state)
    -                        (keys keepers))
    -           (zipmap (vals new-worker-ids) (keys new-worker-ids))
    -           ))
    -
    -    ;; check storm topology code dir exists before launching workers
    -    (doseq [[port assignment] reassign-executors]
    -      (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
    -            storm-id (:storm-id assignment)
    -            cached-assignment-info @(:assignment-versions supervisor)
    -            assignment-info (if (and (not-nil? cached-assignment-info) 
(contains? cached-assignment-info storm-id ))
    -                              (get cached-assignment-info storm-id)
    -                              (.assignment-info-with-version 
storm-cluster-state storm-id nil))
    -       storm-code-map (read-storm-code-locations assignment-info)
    -            master-code-dir (if (contains? storm-code-map :data) 
(storm-code-map :data))
    -            stormroot (supervisor-stormdist-root conf storm-id)]
    -        (if-not (or (contains? downloaded-storm-ids storm-id) (.exists 
(File. stormroot)) (nil? master-code-dir))
    -          (download-storm-code conf storm-id master-code-dir supervisor 
download-lock))
             ))
    -
    -    (wait-for-workers-launch
    -     conf
    -     (dofor [[port assignment] reassign-executors]
    -            (let [id (new-worker-ids port)
    -                  storm-id (:storm-id assignment)
    -                  ^WorkerResources resources (:resources assignment)
    -                  mem-onheap (.get_mem_on_heap resources)]
    -              (try
    -                (log-message "Launching worker with assignment "
    -                             (pr-str assignment)
    -                             " for this supervisor "
    -                             (:supervisor-id supervisor)
    -                             " on port "
    -                             port
    -                             " with id "
    -                             id
    -                             )
    -                (launch-worker supervisor
    -                               (:storm-id assignment)
    -                               port
    -                               id
    -                               mem-onheap)
    -                (mark! supervisor:num-workers-launched)
    -                (catch java.io.FileNotFoundException e
    -                  (log-message "Unable to launch worker due to "
    -                               (.getMessage e)))
    -                (catch java.io.IOException e
    -                  (log-message "Unable to launch worker due to "
    -                               (.getMessage e))))
    -         id)))
    -    ))
    +    (let [valid-new-worker-ids
    +          (into {}
    +            (remove nil?
    +              (dofor [[port assignment] reassign-executors]
    +                (let [id (new-worker-ids port)
    +                      storm-id (:storm-id assignment)
    +                      ^WorkerResources resources (:resources assignment)
    +                      mem-onheap (.get_mem_on_heap resources)]
    +                  ;; This condition checks for required files exist before 
launching the worker
    +                  (if (required-topo-files-exist? conf storm-id)
    +                    (do
    +                      (log-message "Launching worker with assignment "
    +                        (pr-str assignment)
    +                        " for this supervisor "
    +                        (:supervisor-id supervisor)
    +                        " on port "
    +                        port
    +                        " with id "
    +                        id)
    +                      (local-mkdirs (worker-pids-root conf id))
    +                      (local-mkdirs (worker-heartbeats-root conf id))
    +                      (launch-worker supervisor
    +                        (:storm-id assignment)
    +                        port
    +                        id
    +                        mem-onheap)
    +                      [port id])
    --- End diff --
    
    Also, the resulting name of the bound var could be something like 
`id->port` or `valid-new-id->port`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to