[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009741#comment-15009741 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45133317 --- Diff: storm-core/src/clj/backtype/storm/daemon/supervisor.clj --- @@ -372,66 +389,50 @@ ". State: " state ", Heartbeat: " (pr-str heartbeat)) (shutdown-worker supervisor id) - (if (:code-distributor supervisor) - (.cleanup (:code-distributor supervisor) id)) - )) - - (doseq [id (vals new-worker-ids)] - (local-mkdirs (worker-pids-root conf id)) - (local-mkdirs (worker-heartbeats-root conf id))) - (ls-approved-workers! local-state - (merge - (select-keys (ls-approved-workers local-state) - (keys keepers)) - (zipmap (vals new-worker-ids) (keys new-worker-ids)) - )) - - ;; check storm topology code dir exists before launching workers - (doseq [[port assignment] reassign-executors] - (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf)) - storm-id (:storm-id assignment) - cached-assignment-info @(:assignment-versions supervisor) - assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id )) - (get cached-assignment-info storm-id) - (.assignment-info-with-version storm-cluster-state storm-id nil)) - storm-code-map (read-storm-code-locations assignment-info) - master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data)) - stormroot (supervisor-stormdist-root conf storm-id)] - (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir)) - (download-storm-code conf storm-id master-code-dir supervisor download-lock)) )) - - (wait-for-workers-launch - conf - (dofor [[port assignment] reassign-executors] - (let [id (new-worker-ids port) - storm-id (:storm-id assignment) - ^WorkerResources resources (:resources assignment) - mem-onheap (.get_mem_on_heap resources)] - (try - (log-message "Launching worker with assignment " - (pr-str assignment) - " for this supervisor " - (:supervisor-id supervisor) - " on port " - port - " with id " - id - ) - (launch-worker supervisor - (:storm-id assignment) - port - id - mem-onheap) - (mark! supervisor:num-workers-launched) - (catch java.io.FileNotFoundException e - (log-message "Unable to launch worker due to " - (.getMessage e))) - (catch java.io.IOException e - (log-message "Unable to launch worker due to " - (.getMessage e)))) - id))) - )) + (let [valid-new-worker-ids + (into {} + (remove nil? + (dofor [[port assignment] reassign-executors] + (let [id (new-worker-ids port) + storm-id (:storm-id assignment) + ^WorkerResources resources (:resources assignment) + mem-onheap (.get_mem_on_heap resources)] + ;; This condition checks for required files exist before launching the worker + (if (required-topo-files-exist? conf storm-id) + (do + (log-message "Launching worker with assignment " + (pr-str assignment) + " for this supervisor " + (:supervisor-id supervisor) + " on port " + port + " with id " + id) + (local-mkdirs (worker-pids-root conf id)) + (local-mkdirs (worker-heartbeats-root conf id)) + (launch-worker supervisor + (:storm-id assignment) + port + id + mem-onheap) + [port id]) + (do + (log-message "Missing topology storm code, so can't launch worker with assignment " + (pr-str assignment) + " for this supervisor " + (:supervisor-id supervisor) + " on port " + port + " with id " + id) + nil))))))] + (ls-approved-workers! local-state + (merge + (select-keys (ls-approved-workers local-state) + (keys keepers)) + (zipmap (vals valid-new-worker-ids) (keys valid-new-worker-ids)))) --- End diff -- * Some of the indentation looks odd here. * See [above comment](https://github.com/apache/storm/pull/845/files#r45132995) on avoiding map reversal. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)