[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009739#comment-15009739
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45133203
--- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj ---
@@ -372,66 +389,50 @@
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
- (if (:code-distributor supervisor)
- (.cleanup (:code-distributor supervisor) id))
- ))
-
- (doseq [id (vals new-worker-ids)]
- (local-mkdirs (worker-pids-root conf id))
- (local-mkdirs (worker-heartbeats-root conf id)))
- (ls-approved-workers! local-state
- (merge
- (select-keys (ls-approved-workers local-state)
- (keys keepers))
- (zipmap (vals new-worker-ids) (keys new-worker-ids))
- ))
-
- ;; check storm topology code dir exists before launching workers
- (doseq [[port assignment] reassign-executors]
- (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
- storm-id (:storm-id assignment)
- cached-assignment-info @(:assignment-versions supervisor)
- assignment-info (if (and (not-nil? cached-assignment-info)
(contains? cached-assignment-info storm-id ))
- (get cached-assignment-info storm-id)
- (.assignment-info-with-version
storm-cluster-state storm-id nil))
- storm-code-map (read-storm-code-locations assignment-info)
- master-code-dir (if (contains? storm-code-map :data)
(storm-code-map :data))
- stormroot (supervisor-stormdist-root conf storm-id)]
- (if-not (or (contains? downloaded-storm-ids storm-id) (.exists
(File. stormroot)) (nil? master-code-dir))
- (download-storm-code conf storm-id master-code-dir supervisor
download-lock))
))
-
- (wait-for-workers-launch
- conf
- (dofor [[port assignment] reassign-executors]
- (let [id (new-worker-ids port)
- storm-id (:storm-id assignment)
- ^WorkerResources resources (:resources assignment)
- mem-onheap (.get_mem_on_heap resources)]
- (try
- (log-message "Launching worker with assignment "
- (pr-str assignment)
- " for this supervisor "
- (:supervisor-id supervisor)
- " on port "
- port
- " with id "
- id
- )
- (launch-worker supervisor
- (:storm-id assignment)
- port
- id
- mem-onheap)
- (mark! supervisor:num-workers-launched)
- (catch java.io.FileNotFoundException e
- (log-message "Unable to launch worker due to "
- (.getMessage e)))
- (catch java.io.IOException e
- (log-message "Unable to launch worker due to "
- (.getMessage e))))
- id)))
- ))
+ (let [valid-new-worker-ids
+ (into {}
+ (remove nil?
+ (dofor [[port assignment] reassign-executors]
+ (let [id (new-worker-ids port)
+ storm-id (:storm-id assignment)
+ ^WorkerResources resources (:resources assignment)
+ mem-onheap (.get_mem_on_heap resources)]
+ ;; This condition checks for required files exist before
launching the worker
+ (if (required-topo-files-exist? conf storm-id)
+ (do
+ (log-message "Launching worker with assignment "
+ (pr-str assignment)
+ " for this supervisor "
+ (:supervisor-id supervisor)
+ " on port "
+ port
+ " with id "
+ id)
+ (local-mkdirs (worker-pids-root conf id))
+ (local-mkdirs (worker-heartbeats-root conf id))
+ (launch-worker supervisor
+ (:storm-id assignment)
+ port
+ id
+ mem-onheap)
+ [port id])
--- End diff --
Also, the resulting name of the bound var could be something like
`id->port` or `valid-new-id->port`.
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)