http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0da88b0..f093ce5 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,7 @@ <java_jmx.version>0.3.1</java_jmx.version> <compojure.version>1.1.3</compojure.version> <hiccup.version>0.3.6</hiccup.version> + <commons-compress.version>1.4.1</commons-compress.version> <commons-io.version>2.4</commons-io.version> <commons-lang.version>2.5</commons-lang.version> <commons-exec.version>1.1</commons-exec.version> @@ -355,6 +356,11 @@ </dependency> <dependency> <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>${commons-compress.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>${commons-exec.version}</version> </dependency> @@ -469,7 +475,12 @@ <artifactId>curator-client</artifactId> <version>${curator.version}</version> </dependency> - + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> @@ -659,6 +670,7 @@ <version>${thrift.version}</version> <scope>compile</scope> </dependency> + <!-- used by examples/storm-starter --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -818,6 +830,8 @@ <exclude>**/.idea/**</exclude> <!-- module specific testing artifacts --> <exclude>**/metastore_db/**</exclude> + <!-- anything written into build should be ignored --> + <exclude>**/build/**</exclude> <!-- exclude CHANGELOG, VERSION, AND TODO files --> <exclude>**/CHANGELOG.md</exclude>
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 083cdca..72c4a3a 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -36,6 +36,17 @@ </properties> <dependencies> + <!--Hadoop Mini Cluster cannot use log4j2 bridge, + Surefire has a way to exclude the conflicting log4j API jar + from the classpath, classpathDependencyExcludes, but it didn't work in practice. + This is here as a work around to place it at the beginning of the classpath + even though maven does not officially support ordering of the classpath.--> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> @@ -140,6 +151,10 @@ </dependency> <dependency> <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <scope>compile</scope> </dependency> @@ -178,7 +193,6 @@ </exclusion> </exclusions> </dependency> - <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> @@ -193,7 +207,20 @@ </exclusion> </exclusions> </dependency> - + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> @@ -433,6 +460,7 @@ <include>org.yaml:snakeyaml</include> <include>org.jgrapht:jgrapht-core</include> <include>org.apache.commons:commons-exec</include> + <include>org.apache.commons:commons-compress</include> <include>commons-io:commons-io</include> <include>commons-codec:commons-codec</include> <include>commons-fileupload:commons-fileupload</include> @@ -574,6 +602,10 @@ <shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern> </relocation> <relocation> + <pattern>org.apache.commons.compress</pattern> + <shadedPattern>org.apache.storm.shade.org.apache.commons.compress</shadedPattern> + </relocation> + <relocation> <pattern>org.apache.commons.codec</pattern> <shadedPattern>org.apache.storm.shade.org.apache.commons.codec</shadedPattern> </relocation> http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/blobstore.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/blobstore.clj b/storm-core/src/clj/backtype/storm/blobstore.clj new file mode 100644 index 0000000..936f4b5 --- /dev/null +++ b/storm-core/src/clj/backtype/storm/blobstore.clj @@ -0,0 +1,28 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. + +(ns backtype.storm.blobstore + (:import [backtype.storm.utils Utils]) + (:import [backtype.storm.blobstore ClientBlobStore]) + (:use [backtype.storm config])) + +(defmacro with-configured-blob-client + [client-sym & body] + `(let [conf# (read-storm-config) + ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)] + (try + ~@body + (finally (.shutdown ~client-sym))))) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 35aa8c8..ebe4955 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -45,18 +45,14 @@ (log-debug "Creating cluster state: " (.toString clazz)) (or (.mkState state-instance conf auth-conf acls context) nil))) - (defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) - ;returns topologyIds under /stormroot/code-distributor - (code-distributor [this callback]) - ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id - (code-distributor-info [this storm-id]) - + ;returns key information under /storm/blobstore/key + (blobstore-info [this blob-key]) ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> (nimbuses [this]) ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id @@ -90,9 +86,14 @@ (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) - ;adds nimbusinfo under /stormroot/code-distributor/storm-id - (setup-code-distributor! [this storm-id info]) + ;; sets up information related to key consisting of nimbus + ;; host:port and version info of the blob + (setup-blobstore! [this key nimbusInfo versionInfo]) + (active-keys [this]) + (blobstore [this callback]) (remove-storm! [this storm-id]) + (remove-blobstore-key! [this blob-key]) + (remove-key-version! [this blob-key]) (report-error [this storm-id component-id node port error]) (errors [this storm-id component-id]) (last-error [this storm-id component-id]) @@ -107,7 +108,9 @@ (def WORKERBEATS-ROOT "workerbeats") (def BACKPRESSURE-ROOT "backpressure") (def ERRORS-ROOT "errors") -(def CODE-DISTRIBUTOR-ROOT "code-distributor") +(def BLOBSTORE-ROOT "blobstore") +; Stores the latest update sequence for a blob +(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber") (def NIMBUSES-ROOT "nimbuses") (def CREDENTIALS-ROOT "credentials") (def LOGCONFIG-ROOT "logconfigs") @@ -119,7 +122,9 @@ (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT)) (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) -(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT)) +;; Blobstore subtree /storm/blobstore +(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT)) +(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT)) (def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) (def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT)) @@ -133,9 +138,13 @@ [id] (str ASSIGNMENTS-SUBTREE "/" id)) -(defn code-distributor-path - [id] - (str CODE-DISTRIBUTOR-SUBTREE "/" id)) +(defn blobstore-path + [key] + (str BLOBSTORE-SUBTREE "/" key)) + +(defn blobstore-max-key-sequence-number-path + [key] + (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key)) (defn nimbus-path [id] @@ -244,7 +253,7 @@ backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir assignments-callback (atom nil) storm-base-callback (atom {}) - code-distributor-callback (atom nil) + blobstore-callback (atom nil) credentials-callback (atom {}) log-config-callback (atom {}) state-id (.register @@ -259,14 +268,14 @@ (issue-map-callback! assignment-version-callback (first args)) (issue-map-callback! assignment-info-with-version-callback (first args)))) SUPERVISORS-ROOT (issue-callback! supervisors-callback) - CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback) + BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args)) BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args)) ;; this should never happen (exit-process! 30 "Unknown callback for subtree " subtree args)))))] - (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE + (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE LOGCONFIG-SUBTREE]] (.mkdirs cluster-state p acls)) (reify @@ -299,13 +308,13 @@ (swap! assignment-version-callback assoc storm-id callback)) (.get_version cluster-state (assignment-path storm-id) (not-nil? callback))) - (code-distributor + ;; blobstore state + (blobstore [this callback] (when callback - (reset! code-distributor-callback callback)) - (do - (.sync_path cluster-state CODE-DISTRIBUTOR-SUBTREE) - (.get_children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))) + (reset! blobstore-callback callback)) + (.sync_path cluster-state BLOBSTORE-SUBTREE) + (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback))) (nimbuses [this] @@ -327,18 +336,29 @@ (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) - (code-distributor-info - [this storm-id] - (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) - (let [path (code-distributor-path storm-id)] - (do - (.sync_path cluster-state path) - (.get_children cluster-state path false))))) + (setup-blobstore! + [this key nimbusInfo versionInfo] + (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)] + (log-message "setup-path" path) + (.mkdirs cluster-state (blobstore-path key) acls) + ;we delete the node first to ensure the node gets created as part of this session only. + (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo)) + (.set_ephemeral_node cluster-state path nil acls))) + + (blobstore-info + [this blob-key] + (let [path (blobstore-path blob-key)] + (.sync_path cluster-state path) + (.get_children cluster-state path false))) (active-storms [this] (.get_children cluster-state STORMS-SUBTREE false)) + (active-keys + [this] + (.get_children cluster-state BLOBSTORE-SUBTREE false)) + (heartbeat-storms [this] (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false)) @@ -526,18 +546,18 @@ (let [thrift-assignment (thriftify-assignment info)] (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) - (setup-code-distributor! - [this storm-id nimbusInfo] - (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))] - (.mkdirs cluster-state (code-distributor-path storm-id) acls) - ;we delete the node first to ensure the node gets created as part of this session only. - (.delete_node cluster-state path) - (.set_ephemeral_node cluster-state path nil acls))) + (remove-blobstore-key! + [this blob-key] + (log-debug "removing key" blob-key) + (.delete_node cluster-state (blobstore-path blob-key))) + + (remove-key-version! + [this blob-key] + (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key))) (remove-storm! [this storm-id] (.delete_node cluster-state (assignment-path storm-id)) - (.delete_node cluster-state (code-distributor-path storm-id)) (.delete_node cluster-state (credentials-path storm-id)) (.delete_node cluster-state (log-config-path storm-id)) (.delete_node cluster-state (profiler-config-path storm-id)) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj index ff942db..fa36240 100644 --- a/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj +++ b/storm-core/src/clj/backtype/storm/cluster_state/zookeeper_state_factory.clj @@ -149,6 +149,10 @@ [this path] (zk/sync-path zk-writer path)) + (delete-node-blobstore + [this path nimbus-host-port-info] + (zk/delete-node-blobstore zk-writer path nimbus-host-port-info)) + (close [this] (reset! active false) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/command/blobstore.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/command/blobstore.clj b/storm-core/src/clj/backtype/storm/command/blobstore.clj new file mode 100644 index 0000000..ae7f919 --- /dev/null +++ b/storm-core/src/clj/backtype/storm/command/blobstore.clj @@ -0,0 +1,162 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns backtype.storm.command.blobstore + (:import [java.io InputStream OutputStream] + [backtype.storm.generated SettableBlobMeta AccessControl AuthorizationException + KeyNotFoundException] + [backtype.storm.blobstore BlobStoreAclHandler]) + (:use [backtype.storm config] + [clojure.string :only [split]] + [clojure.tools.cli :only [cli]] + [clojure.java.io :only [copy input-stream output-stream]] + [backtype.storm blobstore log util]) + (:gen-class)) + +(defn update-blob-from-stream + "Update a blob in the blob store from an InputStream" + [key ^InputStream in] + (with-configured-blob-client blobstore + (let [out (.updateBlob blobstore key)] + (try + (copy in out) + (.close out) + (catch Exception e + (log-message e) + (.cancel out) + (throw e)))))) + +(defn create-blob-from-stream + "Create a blob in the blob store from an InputStream" + [key ^InputStream in ^SettableBlobMeta meta] + (with-configured-blob-client blobstore + (let [out (.createBlob blobstore key meta)] + (try + (copy in out) + (.close out) + (catch Exception e + (.cancel out) + (throw e)))))) + +(defn read-blob + "Read a blob in the blob store and write to an OutputStream" + [key ^OutputStream out] + (with-configured-blob-client blobstore + (with-open [in (.getBlob blobstore key)] + (copy in out)))) + +(defn as-access-control + "Convert a parameter to an AccessControl object" + [param] + (BlobStoreAclHandler/parseAccessControl (str param))) + +(defn as-acl + [param] + (map as-access-control (split param #","))) + +(defn access-control-str + [^AccessControl acl] + (BlobStoreAclHandler/accessControlToString acl)) + +(defn read-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (output-stream file)] + (read-blob key f)) + (read-blob key System/out)))) + +(defn update-cli [args] + (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])] + (if file + (with-open [f (input-stream file)] + (update-blob-from-stream key f)) + (update-blob-from-stream key System/in)) + (log-message "Successfully updated " key))) + +(defn create-cli [args] + (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil] + ["-a" "--acl" :default [] :parse-fn as-acl] + ["-r" "--replication-factor" :default -1 :parse-fn parse-int]) + meta (doto (SettableBlobMeta. acl) + (.set_replication_factor replication-factor))] + (validate-key-name! key) + (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl))) + (if file + (with-open [f (input-stream file)] + (create-blob-from-stream key f meta)) + (create-blob-from-stream key System/in meta)) + (log-message "Successfully created " key))) + +(defn delete-cli [args] + (with-configured-blob-client blobstore + (doseq [key args] + (.deleteBlob blobstore key) + (log-message "deleted " key)))) + +(defn list-cli [args] + (with-configured-blob-client blobstore + (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)] + (doseq [key keys] + (try + (let [meta (.getBlobMeta blobstore key) + version (.get_version meta) + acl (.get_acl (.get_settable meta))] + (log-message key " " version " " (pr-str (map access-control-str acl)))) + (catch AuthorizationException ae + (if-not (empty? args) (log-error "ACCESS DENIED to key: " key))) + (catch KeyNotFoundException knf + (if-not (empty? args) (log-error key " NOT FOUND")))))))) + +(defn set-acl-cli [args] + (let [[{set-acl :set} [key] _] + (cli args ["-s" "--set" :default [] :parse-fn as-acl])] + (with-configured-blob-client blobstore + (let [meta (.getBlobMeta blobstore key) + acl (.get_acl (.get_settable meta)) + new-acl (if set-acl set-acl acl) + new-meta (SettableBlobMeta. new-acl)] + (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl))) + (.setBlobMeta blobstore key new-meta))))) + +(defn rep-cli [args] + (let [sub-command (first args) + new-args (rest args)] + (with-configured-blob-client blobstore + (condp = sub-command + "--read" (let [key (first new-args) + blob-replication (.getBlobReplication blobstore key)] + (log-message "Current replication factor " blob-replication) + blob-replication) + "--update" (let [[{replication-factor :replication-factor} [key] _] + (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])] + (if (nil? replication-factor) + (throw (RuntimeException. (str "Please set the replication factor"))) + (let [blob-replication (.updateBlobReplication blobstore key replication-factor)] + (log-message "Replication factor is set to " blob-replication) + blob-replication))) + :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command"))))))) + +(defn -main [& args] + (let [command (first args) + new-args (rest args)] + (condp = command + "cat" (read-cli new-args) + "create" (create-cli new-args) + "update" (update-cli new-args) + "delete" (delete-cli new-args) + "list" (list-cli new-args) + "set-acl" (set-acl-cli new-args) + "replication" (rep-cli new-args) + :else (throw (RuntimeException. (str command " is not a supported blobstore command")))))) http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/config.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 94b66c3..1617a3b 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -107,6 +107,18 @@ (FileUtils/forceMkdir (File. ret)) ret)) +(defn master-stormjar-key + [topology-id] + (str topology-id "-stormjar.jar")) + +(defn master-stormcode-key + [topology-id] + (str topology-id "-stormcode.ser")) + +(defn master-stormconf-key + [topology-id] + (str topology-id "-stormconf.ser")) + (defn master-stormdist-root ([conf] (str (master-local-dir conf) file-path-separator "stormdist")) @@ -119,6 +131,10 @@ (FileUtils/forceMkdir (File. ret)) ret )) +(defn read-supervisor-storm-conf-given-path + [conf stormconf-path] + (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path)))))) + (defn master-storm-metafile-path [stormroot ] (str stormroot file-path-separator "storm-code-distributor.meta")) @@ -197,7 +213,7 @@ (let [stormroot (supervisor-stormdist-root conf storm-id) conf-path (supervisor-stormconf-path stormroot) topology-path (supervisor-stormcode-path stormroot)] - (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. conf-path))))))) + (read-supervisor-storm-conf-given-path conf conf-path))) (defn read-supervisor-topology [conf storm-id] @@ -221,7 +237,11 @@ nil ))) - +(defn get-id-from-blob-key + [key] + (if-let [groups (re-find #"^(.*)((-stormjar\.jar)|(-stormcode\.ser)|(-stormconf\.ser))$" key)] + (nth groups 1))) + (defn set-worker-user! [conf worker-id user] (log-message "SET worker-user " worker-id " " user) (let [file (worker-user-file conf worker-id)] http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 71d4654..a53ff82 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -15,14 +15,22 @@ ;; limitations under the License. (ns backtype.storm.daemon.nimbus (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]) + (:import [backtype.storm.generated KeyNotFoundException]) + (:import [backtype.storm.blobstore LocalFsBlobStore]) (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory]) (:import [org.apache.thrift.exception]) (:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket]) (:import [org.apache.commons.io FileUtils]) + (:import [javax.security.auth Subject]) + (:import [backtype.storm.security.auth NimbusPrincipal]) (:import [java.nio ByteBuffer] [java.util Collections List HashMap] [backtype.storm.generated NimbusSummary]) - (:import [java.io FileNotFoundException File FileOutputStream]) + (:import [java.nio ByteBuffer] + [java.util Collections List HashMap ArrayList Iterator]) + (:import [backtype.storm.blobstore AtomicOutputStream BlobStoreAclHandler + InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer]) + (:import [java.io File FileOutputStream FileInputStream]) (:import [java.net InetAddress]) (:import [java.nio.channels Channels WritableByteChannel]) (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) @@ -31,12 +39,12 @@ Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) (:import [backtype.storm.nimbus NimbusInfo]) (:import [backtype.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ThriftTopologyUtils - BufferFileInputStream]) + BufferFileInputStream BufferInputStream]) (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo - ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice - ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction + ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta + BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction ProfileRequest ProfileAction NodeInfo]) (:import [backtype.storm.daemon Shutdownable]) (:use [backtype.storm util config log timer zookeeper local-state]) @@ -47,6 +55,7 @@ (:require [clojure.set :as set]) (:import [backtype.storm.daemon.common StormBase Assignment]) (:use [backtype.storm.daemon common]) + (:use [backtype.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) (:import [backtype.storm.utils VersionInfo]) (:require [clj-time.core :as time]) @@ -112,8 +121,7 @@ scheduler )) -(defmulti mk-code-distributor cluster-mode) -(defmulti sync-code cluster-mode) +(defmulti blob-sync cluster-mode) (defnk is-leader [nimbus :throw-exception true] (let [leader-elector (:leader-elector nimbus)] @@ -126,6 +134,25 @@ [(first ZooDefs$Ids/CREATOR_ALL_ACL) (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)]) +(defn mk-blob-cache-map + "Constructs a TimeCacheMap instance with a blob store timeout whose + expiration callback invokes cancel on the value held by an expired entry when + that value is an AtomicOutputStream and calls close otherwise." + [conf] + (TimeCacheMap. + (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS)) + (reify TimeCacheMap$ExpiredCallback + (expire [this id stream] + (if (instance? AtomicOutputStream stream) + (.cancel stream) + (.close stream)))))) + +(defn mk-bloblist-cache-map + "Constructs a TimeCacheMap instance with a blobstore timeout and no callback + function." + [conf] + (TimeCacheMap. (int (conf NIMBUS-BLOBSTORE-EXPIRATION-SECS)))) + (defn create-tology-action-notifier [conf] (when-not (clojure.string/blank? (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)) (let [instance (new-instance (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))] @@ -153,6 +180,10 @@ :heartbeats-cache (atom {}) :downloaders (file-cache-map conf) :uploaders (file-cache-map conf) + :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf)) + :blob-downloaders (mk-blob-cache-map conf) + :blob-uploaders (mk-blob-cache-map conf) + :blob-listers (mk-bloblist-cache-map conf) :uptime (uptime-computer) :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR)) :timer (mk-timer :kill-fn (fn [t] @@ -161,7 +192,6 @@ )) :scheduler (mk-scheduler conf inimbus) :leader-elector (zk-leader-elector conf) - :code-distributor (mk-code-distributor conf) :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies @@ -175,22 +205,44 @@ (defn inbox [nimbus] (master-inbox (:conf nimbus))) -(defn- read-storm-conf [conf storm-id] - (let [stormroot (master-stormdist-root conf storm-id)] - (merge conf - (clojurify-structure - (Utils/fromCompressedJsonConf - (FileUtils/readFileToByteArray - (File. (master-stormconf-path stormroot)))))))) +(defn- get-subject [] + (let [req (ReqContext/context)] + (.subject req))) + +(def user-subject + (get-subject)) + +(defn- read-storm-conf [conf storm-id blob-store] + (clojurify-structure + (Utils/fromCompressedJsonConf + (.readBlob blob-store (master-stormconf-key storm-id) user-subject)))) (declare delay-event) (declare mk-assignments) +(defn get-nimbus-subject + [] + (let [subject (Subject.) + principal (NimbusPrincipal.) + principals (.getPrincipals subject)] + (.add principals principal) + subject)) + +(def nimbus-subject + (get-nimbus-subject)) + +(defn- get-key-list-from-id + [conf id] + (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)}) + (if (local-mode? conf) + [(master-stormcode-key id) (master-stormconf-key id)] + [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)])) + (defn kill-transition [nimbus storm-id] (fn [kill-time] (let [delay (if kill-time kill-time - (get (read-storm-conf (:conf nimbus) storm-id) + (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus)) TOPOLOGY-MESSAGE-TIMEOUT-SECS))] (delay-event nimbus storm-id @@ -205,7 +257,7 @@ (fn [time num-workers executor-overrides] (let [delay (if time time - (get (read-storm-conf (:conf nimbus) storm-id) + (get (read-storm-conf (:conf nimbus) storm-id (:blob-store nimbus)) TOPOLOGY-MESSAGE-TIMEOUT-SECS))] (delay-event nimbus storm-id @@ -250,6 +302,10 @@ (log-message "Killing topology: " storm-id) (.remove-storm! (:storm-cluster-state nimbus) storm-id) + (when (instance? LocalFsBlobStore (:blob-store nimbus)) + (doseq [blob-key (get-key-list-from-id (:conf nimbus) storm-id)] + (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) + (.remove-key-version! (:storm-cluster-state nimbus) blob-key))) nil) } :rebalancing {:startup (fn [] (delay-event nimbus @@ -391,53 +447,99 @@ [(.getNodeId slot) (.getPort slot)] ))) +(defn- get-version-for-key [key nimbus-host-port-info conf] + (let [version (KeySequenceNumber. key nimbus-host-port-info)] + (.getKeySequenceNumber version conf))) + +(defn get-key-seq-from-blob-store [blob-store] + (let [key-iter (.listKeys blob-store)] + (iterator-seq key-iter))) + (defn- setup-storm-code [nimbus conf storm-id tmp-jar-location storm-conf topology] - (let [stormroot (master-stormdist-root conf storm-id)] - (log-message "nimbus file location:" stormroot) - (FileUtils/forceMkdir (File. stormroot)) - (FileUtils/cleanDirectory (File. stormroot)) - (setup-jar conf tmp-jar-location stormroot) - (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) - (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf storm-conf)) - (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id)) - )) + (let [subject user-subject + storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + jar-key (master-stormjar-key storm-id) + code-key (master-stormcode-key storm-id) + conf-key (master-stormconf-key storm-id) + nimbus-host-port-info (:nimbus-host-port-info nimbus)] + (when tmp-jar-location ;;in local mode there is no jar + (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf)))) + (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf))) + (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf))))) + +(defn- read-storm-topology [storm-id blob-store] + (Utils/deserialize + (.readBlob blob-store (master-stormcode-key storm-id) user-subject) StormTopology)) + +(defn get-blob-replication-count + [blob-key nimbus] + (if (:blob-store nimbus) + (-> (:blob-store nimbus) + (.getBlobReplication blob-key nimbus-subject)))) (defn- wait-for-desired-code-replication [nimbus conf storm-id] (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT) max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC) - total-wait-time (atom 0) - current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 0))] - (if (:code-distributor nimbus) - (while (and (> min-replication-count @current-replication-count) - (or (= -1 max-replication-wait-time) - (< @total-wait-time max-replication-wait-time))) + current-replication-count-jar (if (not (local-mode? conf)) + (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus)) + (atom min-replication-count)) + current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) + current-replication-count-conf (atom (get-blob-replication-count (master-stormconf-key storm-id) nimbus)) + total-wait-time (atom 0)] + (if (:blob-store nimbus) + (while (and + (or (> min-replication-count @current-replication-count-jar) + (> min-replication-count @current-replication-count-code) + (> min-replication-count @current-replication-count-conf)) + (or (neg? max-replication-wait-time) + (< @total-wait-time max-replication-wait-time))) (sleep-secs 1) (log-debug "waiting for desired replication to be achieved. min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time - "current-replication-count = " @current-replication-count " total-wait-time " @total-wait-time) + (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar) + "current-replication-count for code key = " @current-replication-count-code + "current-replication-count for conf key = " @current-replication-count-conf + " total-wait-time " @total-wait-time) (swap! total-wait-time inc) - (reset! current-replication-count (.getReplicationCount (:code-distributor nimbus) storm-id)))) - (if (< min-replication-count @current-replication-count) - (log-message "desired replication count " min-replication-count " achieved, - current-replication-count" @current-replication-count) - (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time " - max-replication-wait-time " so moving on with replication count = " @current-replication-count) - ))) - -(defn- read-storm-topology [conf storm-id] - (let [stormroot (master-stormdist-root conf storm-id)] - (Utils/deserialize - (FileUtils/readFileToByteArray - (File. (master-stormcode-path stormroot)) - ) StormTopology))) + (if (not (local-mode? conf)) + (reset! current-replication-count-conf (get-blob-replication-count (master-stormconf-key storm-id) nimbus))) + (reset! current-replication-count-code (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) + (reset! current-replication-count-jar (get-blob-replication-count (master-stormjar-key storm-id) nimbus)))) + (if (and (< min-replication-count @current-replication-count-conf) + (< min-replication-count @current-replication-count-code) + (< min-replication-count @current-replication-count-jar)) + (log-message "desired replication count of " min-replication-count " not achieved but we have hit the max wait time " + max-replication-wait-time " so moving on with replication count for conf key = " @current-replication-count-conf + " for code key = " @current-replication-count-code "for jar key = " @current-replication-count-jar) + (log-message "desired replication count " min-replication-count " achieved, " + "current-replication-count for conf key = " @current-replication-count-conf ", " + "current-replication-count for code key = " @current-replication-count-code ", " + "current-replication-count for jar key = " @current-replication-count-jar)))) + +(defn- read-storm-topology-as-nimbus [storm-id blob-store] + (Utils/deserialize + (.readBlob blob-store (master-stormcode-key storm-id) nimbus-subject) StormTopology)) (declare compute-executor->component) +(defn read-storm-conf-as-nimbus [storm-id blob-store] + (clojurify-structure + (Utils/fromCompressedJsonConf + (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject)))) + (defn read-topology-details [nimbus storm-id] (let [conf (:conf nimbus) + blob-store (:blob-store nimbus) storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) - topology-conf (read-storm-conf conf storm-id) - topology (read-storm-topology conf storm-id) + topology-conf (read-storm-conf-as-nimbus storm-id blob-store) + topology (read-storm-topology-as-nimbus storm-id blob-store) executor->component (->> (compute-executor->component nimbus storm-id) (map-key (fn [[start-task end-task]] (ExecutorDetails. (int start-task) (int end-task)))))] @@ -530,10 +632,11 @@ (defn- compute-executors [nimbus storm-id] (let [conf (:conf nimbus) + blob-store (:blob-store nimbus) storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) component->executors (:component->executors storm-base) - storm-conf (read-storm-conf conf storm-id) - topology (read-storm-topology conf storm-id) + storm-conf (read-storm-conf-as-nimbus storm-id blob-store) + topology (read-storm-topology-as-nimbus storm-id blob-store) task->component (storm-task-info topology storm-conf)] (->> (storm-task-info topology storm-conf) reverse-map @@ -546,9 +649,10 @@ (defn- compute-executor->component [nimbus storm-id] (let [conf (:conf nimbus) + blob-store (:blob-store nimbus) executors (compute-executors nimbus storm-id) - topology (read-storm-topology conf storm-id) - storm-conf (read-storm-conf conf storm-id) + topology (read-storm-topology-as-nimbus storm-id blob-store) + storm-conf (read-storm-conf-as-nimbus storm-id blob-store) task->component (storm-task-info topology storm-conf) executor->component (into {} (for [executor executors :let [start-task (first executor) @@ -838,7 +942,7 @@ ))) worker->resources (get new-assigned-worker->resources topology-id)]] {topology-id (Assignment. - (master-stormdist-root conf topology-id) + (conf STORM-LOCAL-DIR) (select-keys all-node->host all-nodes) executor->node+port start-times @@ -875,8 +979,9 @@ {:pre [(#{:active :inactive} topology-initial-status)]} (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) - storm-conf (read-storm-conf conf storm-id) - topology (system-topology! storm-conf (read-storm-topology conf storm-id)) + blob-store (:blob-store nimbus) + storm-conf (read-storm-conf conf storm-id blob-store) + topology (system-topology! storm-conf (read-storm-topology storm-id blob-store)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) (.activate-storm! storm-cluster-state @@ -935,17 +1040,15 @@ ([nimbus storm-name storm-conf operation] (check-authorization! nimbus storm-name storm-conf operation (ReqContext/context)))) -(defn code-ids [conf] - (-> conf - master-stormdist-root - read-dir-contents - set - )) +(defn code-ids [blob-store] + (let [to-id (reify KeyFilter + (filter [this key] (get-id-from-blob-key key)))] + (set (.filterAndListKeys blob-store to-id)))) -(defn cleanup-storm-ids [conf storm-cluster-state] +(defn cleanup-storm-ids [conf storm-cluster-state blob-store] (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state)) error-ids (set (.error-topologies storm-cluster-state)) - code-ids (code-ids conf) + code-ids (code-ids blob-store) assigned-ids (set (.active-storms storm-cluster-state))] (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids) )) @@ -1006,22 +1109,35 @@ TOPOLOGY-EVENTLOGGER-EXECUTORS (total-conf TOPOLOGY-EVENTLOGGER-EXECUTORS) TOPOLOGY-MAX-TASK-PARALLELISM (total-conf TOPOLOGY-MAX-TASK-PARALLELISM)}))) +(defn blob-rm-key [blob-store key storm-cluster-state] + (try + (.deleteBlob blob-store key nimbus-subject) + (if (instance? LocalFsBlobStore blob-store) + (.remove-blobstore-key! storm-cluster-state key)) + (catch Exception e + (log-message "Exception" e)))) + +(defn blob-rm-topology-keys [id blob-store storm-cluster-state] + (blob-rm-key blob-store (master-stormjar-key id) storm-cluster-state) + (blob-rm-key blob-store (master-stormconf-key id) storm-cluster-state) + (blob-rm-key blob-store (master-stormcode-key id) storm-cluster-state)) + (defn do-cleanup [nimbus] (if (is-leader nimbus :throw-exception false) (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) - submit-lock (:submit-lock nimbus)] + submit-lock (:submit-lock nimbus) + blob-store (:blob-store nimbus)] (let [to-cleanup-ids (locking submit-lock - (cleanup-storm-ids conf storm-cluster-state))] + (cleanup-storm-ids conf storm-cluster-state blob-store))] (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (if (:code-distributor nimbus) (.cleanup (:code-distributor nimbus) id)) (.teardown-heartbeats! storm-cluster-state id) (.teardown-topology-errors! storm-cluster-state id) (rmr (master-stormdist-root conf id)) - (swap! (:heartbeats-cache nimbus) dissoc id)) - ))) + (blob-rm-topology-keys id blob-store storm-cluster-state) + (swap! (:heartbeats-cache nimbus) dissoc id))))) (log-message "not a leader, skipping cleanup"))) (defn- file-older-than? [now seconds file] @@ -1036,8 +1152,7 @@ (if (.delete f) (log-message "Cleaning inbox ... deleted: " (.getName f)) ;; This should never happen - (log-error "Cleaning inbox ... error deleting: " (.getName f)) - )))) + (log-error "Cleaning inbox ... error deleting: " (.getName f)))))) (defn clean-topology-history "Deletes topologies from history older than minutes." @@ -1051,25 +1166,34 @@ (ls-topo-hist! topo-history-state new-history)))) (defn cleanup-corrupt-topologies! [nimbus] - (if (is-leader nimbus :throw-exception false) - (let [storm-cluster-state (:storm-cluster-state nimbus) - code-ids (set (code-ids (:conf nimbus))) - active-topologies (set (.active-storms storm-cluster-state)) - corrupt-topologies (set/difference active-topologies code-ids)] - (doseq [corrupt corrupt-topologies] - (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") - (.remove-storm! storm-cluster-state corrupt) - ))) - (log-message "not a leader, skipping cleanup-corrupt-topologies")) - -;;setsup code distributor entries for all current topologies for which code is available locally. -(defn setup-code-distributor [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) - locally-available-storm-ids (set (code-ids (:conf nimbus))) + blob-store (:blob-store nimbus) + code-ids (set (code-ids blob-store)) active-topologies (set (.active-storms storm-cluster-state)) - locally-available-active-storm-ids (set/intersection locally-available-storm-ids active-topologies)] - (doseq [storm-id locally-available-active-storm-ids] - (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus))))) + corrupt-topologies (set/difference active-topologies code-ids)] + (doseq [corrupt corrupt-topologies] + (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") + (.remove-storm! storm-cluster-state corrupt) + (if (instance? LocalFsBlobStore blob-store) + (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)] + (.remove-blobstore-key! storm-cluster-state blob-key)))))) + +(defn setup-blobstore [nimbus] + "Sets up blobstore state for all current keys." + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + local-set-of-keys (set (get-key-seq-from-blob-store blob-store)) + all-keys (set (.active-keys storm-cluster-state)) + locally-available-active-keys (set/intersection local-set-of-keys all-keys) + keys-to-delete (set/difference local-set-of-keys all-keys) + conf (:conf nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus)] + (log-debug "Deleting keys not on the zookeeper" keys-to-delete) + (doseq [key keys-to-delete] + (.deleteBlob blob-store key nimbus-subject)) + (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys) + (doseq [key locally-available-active-keys] + (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))))) (defn- get-errors [storm-cluster-state storm-id component-id] (->> (.errors storm-cluster-state storm-id component-id) @@ -1102,26 +1226,26 @@ (catch Exception e (throw (AuthorizationException. (str "Invalid file path: " file-path)))))) -(defn try-read-storm-conf [conf storm-id] +(defn try-read-storm-conf + [conf storm-id blob-store] (try-cause - (read-storm-conf conf storm-id) - (catch FileNotFoundException e - (throw (NotAliveException. (str storm-id)))) - ) -) + (read-storm-conf-as-nimbus storm-id blob-store) + (catch KeyNotFoundException e + (throw (NotAliveException. (str storm-id)))))) -(defn try-read-storm-conf-from-name [conf storm-name nimbus] +(defn try-read-storm-conf-from-name + [conf storm-name nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) id (get-storm-id storm-cluster-state storm-name)] - (try-read-storm-conf conf id))) + (try-read-storm-conf conf id blob-store))) -(defn try-read-storm-topology [conf storm-id] +(defn try-read-storm-topology + [storm-id blob-store] (try-cause - (read-storm-topology conf storm-id) - (catch FileNotFoundException e - (throw (NotAliveException. (str storm-id)))) - ) -) + (read-storm-topology-as-nimbus storm-id blob-store) + (catch KeyNotFoundException e + (throw (NotAliveException. (str storm-id)))))) (defn add-topology-to-history-log [storm-id nimbus topology-conf] @@ -1166,6 +1290,7 @@ (defn renew-credentials [nimbus] (if (is-leader nimbus :throw-exception false) (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) renewers (:cred-renewers nimbus) update-lock (:cred-update-lock nimbus) assigned-ids (set (.active-storms storm-cluster-state))] @@ -1173,7 +1298,7 @@ (doseq [id assigned-ids] (locking update-lock (let [orig-creds (.credentials storm-cluster-state id nil) - topology-conf (try-read-storm-conf (:conf nimbus) id)] + topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)] (if orig-creds (let [new-creds (HashMap. orig-creds)] (doseq [renewer renewers] @@ -1210,22 +1335,40 @@ (.set_reset_log_level_timeout_epoch log-config (coerce/to-long timeout)) (.unset_reset_log_level_timeout_epoch log-config)))) +(defmethod blob-sync :distributed [conf nimbus] + (if (not (is-leader nimbus :throw-exception false)) + (let [storm-cluster-state (:storm-cluster-state nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus) + blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus))) + zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))] + (log-debug "blob-sync " "blob-store-keys " blob-store-key-set "zookeeper-keys " zk-key-set) + (let [sync-blobs (doto + (BlobSynchronizer. (:blob-store nimbus) conf) + (.setNimbusInfo nimbus-host-port-info) + (.setBlobStoreKeySet blob-store-key-set) + (.setZookeeperKeySet zk-key-set))] + (.syncBlobs sync-blobs))))) + +(defmethod blob-sync :local [conf nimbus] + nil) + (defserverfn service-handler [conf inimbus] (.prepare inimbus conf (master-inimbus-dir conf)) (log-message "Starting Nimbus with conf " conf) (let [nimbus (nimbus-data conf inimbus) + blob-store (:blob-store nimbus) principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf) admin-users (or (.get conf NIMBUS-ADMINS) []) get-common-topo-info (fn [^String storm-id operation] (let [storm-cluster-state (:storm-cluster-state nimbus) - topology-conf (try-read-storm-conf conf storm-id) + topology-conf (try-read-storm-conf conf storm-id blob-store) storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf operation) - topology (try-read-storm-topology conf storm-id) + topology (try-read-storm-topology storm-id blob-store) task->component (storm-task-info topology topology-conf) base (.storm-base storm-cluster-state storm-id nil) launch-time-secs (if base (:launch-time-secs base) @@ -1264,10 +1407,11 @@ (.addToLeaderLockQueue (:leader-elector nimbus)) (cleanup-corrupt-topologies! nimbus) - (setup-code-distributor nimbus) + (when (instance? LocalFsBlobStore blob-store) + ;register call back for blob-store + (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus))) + (setup-blobstore nimbus)) - ;register call back for code-distributor - (.code-distributor (:storm-cluster-state nimbus) (fn [] (sync-code conf nimbus))) (when (is-leader nimbus :throw-exception false) (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup))) @@ -1278,31 +1422,27 @@ (when-not (conf NIMBUS-DO-NOT-REASSIGN) (locking (:submit-lock nimbus) (mk-assignments nimbus))) - (do-cleanup nimbus) - )) + (do-cleanup nimbus))) ;; Schedule Nimbus inbox cleaner (schedule-recurring (:timer nimbus) 0 (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) (fn [] - (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) - )) + (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))) + ;; Schedule nimbus code sync thread to sync code from other nimbuses. + (if (instance? LocalFsBlobStore blob-store) + (schedule-recurring (:timer nimbus) + 0 + (conf NIMBUS-CODE-SYNC-FREQ-SECS) + (fn [] + (blob-sync conf nimbus)))) ;; Schedule topology history cleaner (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] (schedule-recurring (:timer nimbus) 0 (conf LOGVIEWER-CLEANUP-INTERVAL-SECS) (fn [] - (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus) - ))) - ;;schedule nimbus code sync thread to sync code from other nimbuses. - (schedule-recurring (:timer nimbus) - 0 - (conf NIMBUS-CODE-SYNC-FREQ-SECS) - (fn [] - (sync-code conf nimbus) - )) - + (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))) (schedule-recurring (:timer nimbus) 0 (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS) @@ -1349,10 +1489,11 @@ principal (.principal req) submitter-principal (if principal (.toString principal)) submitter-user (.toLocal principal-to-local principal) + system-user (System/getProperty "user.name") topo-acl (distinct (remove nil? (conj (.get storm-conf-submitted TOPOLOGY-USERS) submitter-principal, submitter-user))) storm-conf (-> storm-conf-submitted (assoc TOPOLOGY-SUBMITTER-PRINCIPAL (if submitter-principal submitter-principal "")) - (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user "")) ;Don't let the user set who we launch as + (assoc TOPOLOGY-SUBMITTER-USER (if submitter-user submitter-user system-user)) ;Don't let the user set who we launch as (assoc TOPOLOGY-USERS topo-acl) (assoc STORM-ZOOKEEPER-SUPERACL (.get conf STORM-ZOOKEEPER-SUPERACL))) storm-conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) @@ -1380,8 +1521,8 @@ (check-storm-active! nimbus storm-name false) ;;cred-update-lock is not needed here because creds are being added for the first time. (.set-credentials! storm-cluster-state storm-id credentials storm-conf) - (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) - (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)) + (log-message "uploadedJar " uploadedJarLocation) + (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology) (wait-for-desired-code-replication nimbus total-storm-conf storm-id) (.setup-heartbeats! storm-cluster-state storm-id) (.setup-backpressure! storm-cluster-state storm-id) @@ -1456,7 +1597,7 @@ (mark! nimbus:num-debug-calls) (let [storm-cluster-state (:storm-cluster-state nimbus) storm-id (get-storm-id storm-cluster-state storm-name) - topology-conf (try-read-storm-conf conf storm-id) + topology-conf (try-read-storm-conf conf storm-id blob-store) ;; make sure samplingPct is within bounds. spct (Math/max (Math/min samplingPct 100.0) 0.0) ;; while disabling we retain the sampling pct. @@ -1475,7 +1616,7 @@ (^void setWorkerProfiler [this ^String id ^ProfileRequest profileRequest] (mark! nimbus:num-setWorkerProfiler-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler") storm-cluster-state (:storm-cluster-state nimbus)] @@ -1506,7 +1647,7 @@ (^void setLogConfig [this ^String id ^LogConfig log-config-msg] (mark! nimbus:num-setLogConfig-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "setLogConfig") storm-cluster-state (:storm-cluster-state nimbus) @@ -1534,7 +1675,7 @@ (mark! nimbus:num-uploadNewCredentials-calls) (let [storm-cluster-state (:storm-cluster-state nimbus) storm-id (get-storm-id storm-cluster-state storm-name) - topology-conf (try-read-storm-conf conf storm-id) + topology-conf (try-read-storm-conf conf storm-id blob-store) creds (when credentials (.get_creds credentials))] (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials") (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf)))) @@ -1607,7 +1748,7 @@ (^LogConfig getLogConfig [this ^String id] (mark! nimbus:num-getLogConfig-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "getLogConfig") storm-cluster-state (:storm-cluster-state nimbus) @@ -1616,24 +1757,24 @@ (^String getTopologyConf [this ^String id] (mark! nimbus:num-getTopologyConf-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME)] (check-authorization! nimbus storm-name topology-conf "getTopologyConf") (to-json topology-conf))) (^StormTopology getTopology [this ^String id] (mark! nimbus:num-getTopology-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME)] (check-authorization! nimbus storm-name topology-conf "getTopology") (system-topology! topology-conf (try-read-storm-topology conf id)))) (^StormTopology getUserTopology [this ^String id] (mark! nimbus:num-getUserTopology-calls) - (let [topology-conf (try-read-storm-conf conf id) + (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus)) storm-name (topology-conf TOPOLOGY-NAME)] (check-authorization! nimbus storm-name topology-conf "getUserTopology") - (try-read-storm-topology topology-conf id))) + (try-read-storm-topology id blob-store))) (^ClusterSummary getClusterInfo [this] (mark! nimbus:num-getClusterInfo-calls) @@ -1668,42 +1809,39 @@ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary)))))) topology-summaries (dofor [[id base] bases :when base] - (let [assignment (.assignment-info storm-cluster-state id nil) - topo-summ (TopologySummary. id - (:storm-name base) - (->> (:executor->node+port assignment) - keys - (mapcat executor-id->tasks) - count) - (->> (:executor->node+port assignment) - keys - count) - (->> (:executor->node+port assignment) - vals - set - count) - (time-delta (:launch-time-secs base)) - (extract-status-str base))] - (when-let [owner (:owner base)] (.set_owner topo-summ owner)) - (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) - (when-let [resources (.get @(:id->resources nimbus) id)] - (.set_requested_memonheap topo-summ (get resources 0)) - (.set_requested_memoffheap topo-summ (get resources 1)) - (.set_requested_cpu topo-summ (get resources 2)) - (.set_assigned_memonheap topo-summ (get resources 3)) - (.set_assigned_memoffheap topo-summ (get resources 4)) - (.set_assigned_cpu topo-summ (get resources 5))) - (.set_replication_count topo-summ (if (:code-distributor nimbus) - (.getReplicationCount (:code-distributor nimbus) id) - 1)) - topo-summ - )) + (let [assignment (.assignment-info storm-cluster-state id nil) + topo-summ (TopologySummary. id + (:storm-name base) + (->> (:executor->node+port assignment) + keys + (mapcat executor-id->tasks) + count) + (->> (:executor->node+port assignment) + keys + count) + (->> (:executor->node+port assignment) + vals + set + count) + (time-delta (:launch-time-secs base)) + (extract-status-str base))] + (when-let [owner (:owner base)] (.set_owner topo-summ owner)) + (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) + (when-let [resources (.get @(:id->resources nimbus) id)] + (.set_requested_memonheap topo-summ (get resources 0)) + (.set_requested_memoffheap topo-summ (get resources 1)) + (.set_requested_cpu topo-summ (get resources 2)) + (.set_assigned_memonheap topo-summ (get resources 3)) + (.set_assigned_memoffheap topo-summ (get resources 4)) + (.set_assigned_cpu topo-summ (get resources 5))) + (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus)) + topo-summ)) ret (ClusterSummary. supervisor-summaries topology-summaries nimbuses) _ (.set_nimbus_uptime_secs ret nimbus-uptime)] ret)) - + (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options] (mark! nimbus:num-getTopologyInfoWithOpts-calls) (let [{:keys [storm-name @@ -1763,10 +1901,8 @@ (.set_assigned_cpu topo-info (get resources 5))) (when-let [component->debug (:component->debug base)] (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug))) - (.set_replication_count topo-info (if (:code-distributor nimbus) (.getReplicationCount (:code-distributor nimbus) storm-id) 1)) - - topo-info - )) + (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) + topo-info)) (^TopologyInfo getTopologyInfo [this ^String topology-id] (mark! nimbus:num-getTopologyInfo-calls) @@ -1774,6 +1910,157 @@ topology-id (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) + (^String beginCreateBlob [this + ^String blob-key + ^SettableBlobMeta blob-meta] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) + session-id + (.createBlob (:blob-store nimbus) blob-key blob-meta user-subject)) + (log-message "Created blob for " blob-key + " with session id " session-id) + (str session-id))) + + (^String beginUpdateBlob [this ^String blob-key] + (let [^AtomicOutputStream os (.updateBlob (:blob-store nimbus) + blob-key user-subject)] + (let [session-id (uuid)] + (.put (:blob-uploaders nimbus) session-id os) + (log-message "Created upload session for " blob-key + " with id " session-id) + (str session-id)))) + + (^void createStateInZookeeper [this ^String blob-key] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + nimbus-host-port-info (:nimbus-host-port-info nimbus) + conf (:conf nimbus)] + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) + + (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] + (let [uploaders (:blob-uploaders nimbus)] + (if-let [^AtomicOutputStream os (.get uploaders session)] + (let [chunk-array (.array blob-chunk) + remaining (.remaining blob-chunk) + array-offset (.arrayOffset blob-chunk) + position (.position blob-chunk)] + (.write os chunk-array (+ array-offset position) remaining) + (.put uploaders session os)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)")))) + + (^void finishBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.close os) + (log-message "Finished uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^void cancelBlobUpload [this ^String session] + (if-let [^AtomicOutputStream os (.get (:blob-uploaders nimbus) session)] + (do + (.cancel os) + (log-message "Canceled uploading blob for session " + session + ". Closing session.") + (.remove (:blob-uploaders nimbus) session)) + (throw-runtime "Blob for session " + session + " does not exist (or timed out)"))) + + (^ReadableBlobMeta getBlobMeta [this ^String blob-key] + (let [^ReadableBlobMeta ret (.getBlobMeta (:blob-store nimbus) + blob-key user-subject)] + ret)) + + (^void setBlobMeta [this ^String blob-key ^SettableBlobMeta blob-meta] + (->> (ReqContext/context) + (.subject) + (.setBlobMeta (:blob-store nimbus) blob-key blob-meta))) + + (^BeginDownloadResult beginBlobDownload [this ^String blob-key] + (let [^InputStreamWithMeta is (.getBlob (:blob-store nimbus) + blob-key user-subject)] + (let [session-id (uuid) + ret (BeginDownloadResult. (.getVersion is) (str session-id))] + (.set_data_size ret (.getFileLength is)) + (.put (:blob-downloaders nimbus) session-id (BufferInputStream. is (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES) (int 65536)))) + (log-message "Created download session for " blob-key + " with id " session-id) + ret))) + + (^ByteBuffer downloadBlobChunk [this ^String session] + (let [downloaders (:blob-downloaders nimbus) + ^BufferInputStream is (.get downloaders session)] + (when-not is + (throw (RuntimeException. + "Could not find input stream for session " session))) + (let [ret (.read is)] + (.put downloaders session is) + (when (empty? ret) + (.close is) + (.remove downloaders session)) + (log-debug "Sending " (alength ret) " bytes") + (ByteBuffer/wrap ret)))) + + (^void deleteBlob [this ^String blob-key] + (let [subject (->> (ReqContext/context) + (.subject))] + (.deleteBlob (:blob-store nimbus) blob-key subject) + (when (instance? LocalFsBlobStore blob-store) + (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) + (.remove-key-version! (:storm-cluster-state nimbus) blob-key)) + (log-message "Deleted blob for key " blob-key))) + + (^ListBlobsResult listBlobs [this ^String session] + (let [listers (:blob-listers nimbus) + ^Iterator keys-it (if (clojure.string/blank? session) + (.listKeys (:blob-store nimbus)) + (.get listers session)) + _ (or keys-it (throw-runtime "Blob list for session " + session + " does not exist (or timed out)")) + + ;; Create a new session id if the user gave an empty session string. + ;; This is the use case when the user wishes to list blobs + ;; starting from the beginning. + session (if (clojure.string/blank? session) + (let [new-session (uuid)] + (log-message "Creating new session for downloading list " new-session) + new-session) + session)] + (if-not (.hasNext keys-it) + (do + (.remove listers session) + (log-message "No more blobs to list for session " session) + ;; A blank result communicates that there are no more blobs. + (ListBlobsResult. (ArrayList. 0) (str session))) + (let [^List list-chunk (->> keys-it + (iterator-seq) + (take 100) ;; Limit to next 100 keys + (ArrayList.))] + (log-message session " downloading " (.size list-chunk) " entries") + (.put listers session keys-it) + (ListBlobsResult. list-chunk (str session)))))) + + (^int getBlobReplication [this ^String blob-key] + (->> (ReqContext/context) + (.subject) + (.getBlobReplication (:blob-store nimbus) blob-key))) + + (^int updateBlobReplication [this ^String blob-key ^int replication] + (->> (ReqContext/context) + (.subject) + (.updateBlobReplication (:blob-store nimbus) blob-key replication))) + (^TopologyPageInfo getTopologyPageInfo [this ^String topo-id ^String window ^boolean include-sys?] (mark! nimbus:num-getTopologyPageInfo-calls) @@ -1807,9 +2094,8 @@ (.set_status (extract-status-str (:base info))) (.set_uptime_secs (time-delta (:launch-time-secs info))) (.set_topology_conf (to-json (try-read-storm-conf conf - topo-id))) - (.set_replication_count - (.getReplicationCount (:code-distributor nimbus) topo-id))) + topo-id (:blob-store nimbus)))) + (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus))) (when-let [debug-options (get-in info [:base :component->debug topo-id])] (.set_debug_options @@ -1889,60 +2175,14 @@ (.disconnect (:storm-cluster-state nimbus)) (.cleanup (:downloaders nimbus)) (.cleanup (:uploaders nimbus)) + (.shutdown (:blob-store nimbus)) (.close (:leader-elector nimbus)) - (if (:code-distributor nimbus) (.close (:code-distributor nimbus) (:conf nimbus))) (when (:nimbus-topology-action-notifier nimbus) (.cleanup (:nimbus-topology-action-notifier nimbus))) - (log-message "Shut down master") - ) + (log-message "Shut down master")) DaemonCommon (waiting? [this] (timer-waiting? (:timer nimbus)))))) -(defmethod mk-code-distributor :distributed [conf] - (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))] - (.prepare code-distributor conf) - code-distributor)) - -(defmethod mk-code-distributor :local [conf] - nil) - -(defn download-code [conf nimbus storm-id host port] - (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid)) - storm-cluster-state (:storm-cluster-state nimbus) - storm-root (master-stormdist-root conf storm-id) - remote-meta-file-path (master-storm-metafile-path storm-root) - local-meta-file-path (master-storm-metafile-path tmp-root)] - (FileUtils/forceMkdir (File. tmp-root)) - (Utils/downloadFromHost conf remote-meta-file-path local-meta-file-path host port) - (if (:code-distributor nimbus) - (.download (:code-distributor nimbus) storm-id (File. local-meta-file-path))) - (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root))) - (FileUtils/moveDirectory (File. tmp-root) (File. storm-root)) - (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))) - -(defmethod sync-code :distributed [conf nimbus] - (let [storm-cluster-state (:storm-cluster-state nimbus) - active-topologies (set (.code-distributor storm-cluster-state (fn [] (sync-code conf nimbus)))) - missing-topologies (set/difference active-topologies (set (code-ids (:conf nimbus))))] - (if (not (empty? missing-topologies)) - (do - (.removeFromLeaderLockQueue (:leader-elector nimbus)) - (doseq [missing missing-topologies] - (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.") - (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)] - (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing)) - (doseq [nimbus-host-port nimbuses-with-missing] - (when-not (contains? (code-ids (:conf nimbus)) missing) - (try - (download-code conf nimbus missing (.getHost nimbus-host-port) (.getPort nimbus-host-port)) - (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing))))))))) - - (if (empty? (set/difference active-topologies (set (code-ids (:conf nimbus))))) - (.addToLeaderLockQueue (:leader-elector nimbus))))) - -(defmethod sync-code :local [conf nimbus] - nil) - (defn launch-server! [conf nimbus] (validate-distributed-mode! conf) (let [service-handler (service-handler conf nimbus)
