STORM-166: some refactoring so host:port parsing happens in centralized location.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0bb3830 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0bb3830 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0bb3830 Branch: refs/heads/0.11.x-branch Commit: d0bb3830a3e6fccbfb4d17ed170ed8f4e6056df0 Parents: 50fdcee Author: Parth Brahmbhatt <[email protected]> Authored: Fri Dec 19 11:26:26 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Fri Dec 19 11:26:26 2014 -0800 ---------------------------------------------------------------------- storm-core/src/clj/backtype/storm/cluster.clj | 7 ++++--- .../src/clj/backtype/storm/daemon/nimbus.clj | 20 +++++++++----------- .../jvm/backtype/storm/nimbus/NimbusInfo.java | 17 ++++++++++++++++- 3 files changed, 29 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/clj/backtype/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 572fc96..6078852 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -20,6 +20,7 @@ (:import [backtype.storm.utils Utils]) (:import [java.security MessageDigest]) (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) + (:import [backtype.storm.nimbus NimbusInfo]) (:use [backtype.storm util log config]) (:require [backtype.storm [zookeeper :as zk]]) (:require [backtype.storm.daemon [common :as common]])) @@ -328,7 +329,7 @@ (code-distributor-info [this storm-id] - (get-children cluster-state (code-distributor-path storm-id) false)) + (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state (code-distributor-path storm-id) false))) (active-storms [this] @@ -431,9 +432,9 @@ (set-data cluster-state (assignment-path storm-id) (Utils/serialize info) acls)) (setup-code-distributor! - [this storm-id info] + [this storm-id nimbusInfo] (mkdirs cluster-state (code-distributor-path storm-id) acls) - (mkdirs cluster-state (str (code-distributor-path storm-id) "/" info) acls)) + (mkdirs cluster-state (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo)) acls)) (remove-storm! [this storm-id] http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e991de3..c652b41 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -25,6 +25,7 @@ Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) (:use [backtype.storm bootstrap util zookeeper]) (:import [backtype.storm.generated AuthorizationException]) + (:import [backtype.storm.nimbus NimbusInfo]) (:use [backtype.storm bootstrap util]) (:use [backtype.storm.config :only [validate-configs-with-schemas]]) (:use [backtype.storm.daemon common]) @@ -78,7 +79,7 @@ (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf - :host-port-info (str (.getCanonicalHostName (InetAddress/getLocalHost)) ":" (conf NIMBUS-THRIFT-PORT)) + :nimbus-host-port-info (NimbusInfo. (.getCanonicalHostName (InetAddress/getLocalHost)) (conf NIMBUS-THRIFT-PORT) false) :inimbus inimbus :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf) :submitted-count (atom 0) @@ -1107,8 +1108,7 @@ (dissoc storm-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) total-storm-conf (merge conf storm-conf) topology (normalize-topology total-storm-conf topology) - storm-cluster-state (:storm-cluster-state nimbus) - host-port-info (:host-port-info nimbus) ] + storm-cluster-state (:storm-cluster-state nimbus)] (when credentials (doseq [nimbus-autocred-plugin (:nimbus-autocred-plugins nimbus)] (.populateCredentials nimbus-autocred-plugin credentials (Collections/unmodifiableMap storm-conf)))) (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? submitter-user) (.isEmpty (.trim submitter-user)))) @@ -1125,7 +1125,7 @@ ;;cred-update-lock is not needed here because creds are being added for the first time. (.set-credentials! storm-cluster-state storm-id credentials storm-conf) (setup-storm-code nimbus conf storm-id uploadedJarLocation storm-conf topology) - (.setup-code-distributor! storm-cluster-state storm-id host-port-info) + (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)) (wait-for-desired-code-replication nimbus conf storm-id) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive @@ -1380,7 +1380,6 @@ (defn download-code [conf nimbus storm-id host port] (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid)) storm-cluster-state (:storm-cluster-state nimbus) - host-port-info (:host-port-info nimbus) storm-root (master-stormdist-root conf storm-id) remote-meta-file-path (master-storm-metafile-path storm-root) local-meta-file-path (master-storm-metafile-path tmp-root)] @@ -1390,7 +1389,7 @@ (.download (:bt-tracker nimbus) storm-id (File. local-meta-file-path))) (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. storm-root))) (FileUtils/moveDirectory (File. tmp-root) (File. storm-root)) - (.setup-code-distributor! storm-cluster-state storm-id host-port-info))) + (.setup-code-distributor! storm-cluster-state storm-id (:nimbus-host-port-info nimbus)))) (defmethod sync-code :distributed [conf nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) @@ -1405,11 +1404,10 @@ (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)] (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing)) (doseq [nimbus-host-port nimbuses-with-missing] - (let [[host port] (clojure.string/split nimbus-host-port #":")] - (when-not (contains? (code-ids (:conf nimbus)) missing) - (try - (download-code conf nimbus missing host (Integer/parseInt port)) - (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing)))))))))) + (when-not (contains? (code-ids (:conf nimbus)) missing) + (try + (download-code conf nimbus missing (.getHost nimbus-host-port) (.getPort nimbus-host-port)) + (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing))))))))) (.addToLeaderLockQueue (:leader-elector nimbus)) (log-message "local disk is completely in sync with zk code-distributor."))) http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java index 1baa461..e31090f 100644 --- a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java +++ b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java @@ -3,9 +3,11 @@ package backtype.storm.nimbus; import java.io.Serializable; public class NimbusInfo implements Serializable { + private static final String DELIM = ":"; + private String host; private int port; - private transient boolean isLeader; + private boolean isLeader; public NimbusInfo(String host, int port, boolean isLeader) { this.host = host; @@ -13,6 +15,19 @@ public class NimbusInfo implements Serializable { this.isLeader = isLeader; } + public static NimbusInfo parse(String nimbusInfo) { + String[] hostAndPort = nimbusInfo.split(DELIM); + if(hostAndPort != null && hostAndPort.length == 2) { + return new NimbusInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1]), false); + } else { + throw new RuntimeException("nimbusInfo should have format of host:port, invalid string " + nimbusInfo); + } + } + + public String toHostPortString() { + return String.format("%s%s%s",host,DELIM,port); + } + public boolean isLeader() { return isLeader; }
