[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990620#comment-14990620 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r43952116 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -984,25 +1108,30 @@ )))) (defn cleanup-corrupt-topologies! [nimbus] - (if (is-leader nimbus :throw-exception false) - (let [storm-cluster-state (:storm-cluster-state nimbus) - code-ids (set (code-ids (:conf nimbus))) - active-topologies (set (.active-storms storm-cluster-state)) - corrupt-topologies (set/difference active-topologies code-ids)] - (doseq [corrupt corrupt-topologies] - (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") - (.remove-storm! storm-cluster-state corrupt) - ))) - (log-message "not a leader, skipping cleanup-corrupt-topologies")) - -;;setsup code distributor entries for all current topologies for which code is available locally. -(defn setup-code-distributor [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) - locally-available-storm-ids (set (code-ids (:conf nimbus))) + blob-store (:blob-store nimbus) + code-ids (set (code-ids blob-store)) active-topologies (set (.active-storms storm-cluster-state)) - locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)] - (doseq [storm-id locally-available-active-storm-ids] - (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))) + corrupt-topologies (set/difference active-topologies code-ids)] + (doseq [corrupt corrupt-topologies] + (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") + (.remove-storm! storm-cluster-state corrupt) + (if (instance? LocalFsBlobStore blob-store) + (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)] + (.remove-blobstore-key! storm-cluster-state blob-key))) + ))) + +;;setsup blobstore for all current keys +(defn setup-blobstore [nimbus] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + local-set-of-keys (get-key-set-from-blob-store blob-store) + all-keys (set (.active-keys storm-cluster-state)) + locally-available-active-keys (set/intersection local-set-of-keys all-keys)] + (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys) + (doseq [key locally-available-active-keys] + (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-metadata-version blob-store key get-nimbus-subject)) + ))) --- End diff -- dangling parens > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)