[ https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009218#comment-15009218 ]
ASF GitHub Bot commented on STORM-876: -------------------------------------- Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/845#discussion_r45100258 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -390,53 +444,98 @@ [(.getNodeId slot) (.getPort slot)] ))) +(defn- get-version-for-key [key nimbus-host-port-info conf] + (let [version (KeyVersion. key nimbus-host-port-info)] + (.getKeyVersion version conf))) + +(defn get-key-seq-from-blob-store [blob-store] + (let [key-iter (.listKeys blob-store nimbus-subject)] + (iterator-seq key-iter))) + (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] - (let [stormroot (master-stormdist-root conf storm-id)] - (log-message "nimbus file location:" stormroot) - (FileUtils/forceMkdir (File. stormroot)) - (FileUtils/cleanDirectory (File. stormroot)) - (setup-jar conf tmp-jar-location stormroot) - (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) - (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf)) - (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id)) - )) + (let [subject (get-subject) + storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + jar-key (master-stormjar-key storm-id) + code-key (master-stormcode-key storm-id) + conf-key (master-stormconf-key storm-id) + nimbus-host-port-info (:nimbus-host-port-info nimbus)] + (when tmp-jar-location ;;in local mode there is no jar + (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf)))) + (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf))) + (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf))))) + +(defn- read-storm-topology [storm-id blob-store] + (Utils/deserialize + (.readBlob blob-store (master-stormcode-key storm-id) (get-subject)) StormTopology)) + +(defn- get-blob-replication-count [blob-key nimbus] + (if (:blob-store nimbus) + (-> (:blob-store nimbus) + (.getBlobReplication blob-key nimbus-subject)))) (defn- wait-for-desired-code-replication [nimbus conf storm-id] (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT) max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC) - total-wait-time (atom 0) - current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))] - (if (:code-distributor nimbus) - (while (and (> min-replication-count @current-replication-count) - (or (= -1 max-replication-wait-time) - (< @total-wait-time max-replication-wait-time))) + current-replication-count-jar (if (not (local-mode? conf)) + (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus)) + (atom min-replication-count)) + current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) + current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus)) + total-wait-time (atom 0)] + (if (:blob-store nimbus) + (while (and + (or (> min-replication-count @current-replication-count-jar) + (> min-replication-count @current-replication-count-code) + (> min-replication-count @current-replication-count-conf)) + (or (= -1 max-replication-wait-time) + (< @total-wait-time max-replication-wait-time))) (sleep-secs 1) (log-debug "waiting for desired replication to be achieved. min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time - "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time) + (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar) + "current-replication-count for code key = " @current-replication-count-code + "current-replication-count for conf key = " @current-replication-count-conf + " total-wait-time " @total-wait-time) (swap! total-wait-time inc) - (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id)))) - (if (< min-replication-count @current-replication-count) - (log-message "desired replication count " min-replication-count " achieved, - current-replication-count" @current-replication-count) - (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time " - max-replication-wait-time " so moving on with replication count = " @current-replication-count) - ))) - -(defn- read-storm-topology [conf storm-id] - (let [stormroot (master-stormdist-root conf storm-id)] - (Utils/deserialize - (FileUtils/readFileToByteArray - (File. (master-stormcode-path stormroot)) - ) StormTopology))) + (if (not (local-mode? conf)) + (reset! current-replication-count-conf (get-blob-replication-count (master-stormconf-key storm-id)))) + (reset! current-replication-count-code (get-blob-replication-count (master-stormcode-key storm-id))) + (reset! current-replication-count-jar (get-blob-replication-count (master-stormjar-key storm-id))))) --- End diff -- Re-posting since my previous comment was not addressed: Here we are calling `get-blob-replication-count` three times with one argument, while the arity of the funciton is 2. I think the missing argument here might be `nimbus`. > Dist Cache: Basic Functionality > ------------------------------- > > Key: STORM-876 > URL: https://issues.apache.org/jira/browse/STORM-876 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf > > > Basic functionality for the Dist Cache feature. > As part of this a new API should be added to support uploading and > downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar > should be written into the blob store instead of residing locally. We need a > default implementation of the blob store that does essentially what nimbus > currently does and does not need anything extra. But having an HDFS backend > too would be great for scalability and HA. > The supervisor should provide a way to download and manage these blobs and > provide a working directory for the worker process with symlinks to the > blobs. It should also allow the blobs to be updated and switch the symlink > atomically to point to the new blob once it is downloaded. > All of this is already done by code internal to Yahoo! we are in the process > of getting it ready to push back to open source shortly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)