STORM-166: some refactoring so host:port parsing happens in centralized 
location.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0bb3830
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0bb3830
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0bb3830

Branch: refs/heads/0.11.x-branch
Commit: d0bb3830a3e6fccbfb4d17ed170ed8f4e6056df0
Parents: 50fdcee
Author: Parth Brahmbhatt <[email protected]>
Authored: Fri Dec 19 11:26:26 2014 -0800
Committer: Parth Brahmbhatt <[email protected]>
Committed: Fri Dec 19 11:26:26 2014 -0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj   |  7 ++++---
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 20 +++++++++-----------
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   | 17 ++++++++++++++++-
 3 files changed, 29 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj 
b/storm-core/src/clj/backtype/storm/cluster.clj
index 572fc96..6078852 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -20,6 +20,7 @@
   (:import [backtype.storm.utils Utils])
   (:import [java.security MessageDigest])
   (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
+  (:import [backtype.storm.nimbus NimbusInfo])
   (:use [backtype.storm util log config])
   (:require [backtype.storm [zookeeper :as zk]])
   (:require [backtype.storm.daemon [common :as common]]))
@@ -328,7 +329,7 @@
 
       (code-distributor-info
         [this storm-id]
-        (get-children cluster-state (code-distributor-path storm-id) false))
+        (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children 
cluster-state (code-distributor-path storm-id) false)))
 
       (active-storms
         [this]
@@ -431,9 +432,9 @@
         (set-data cluster-state (assignment-path storm-id) (Utils/serialize 
info) acls))
 
       (setup-code-distributor!
-        [this storm-id info]
+        [this storm-id nimbusInfo]
         (mkdirs cluster-state (code-distributor-path storm-id) acls)
-        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" info) 
acls))
+        (mkdirs cluster-state (str (code-distributor-path storm-id) "/" 
(.toHostPortString nimbusInfo)) acls))
 
       (remove-storm!
         [this storm-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj 
b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e991de3..c652b41 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -25,6 +25,7 @@
             Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl 
DefaultScheduler ExecutorDetails])
   (:use [backtype.storm bootstrap util zookeeper])
   (:import [backtype.storm.generated AuthorizationException])
+  (:import [backtype.storm.nimbus NimbusInfo])
   (:use [backtype.storm bootstrap util])
   (:use [backtype.storm.config :only [validate-configs-with-schemas]])
   (:use [backtype.storm.daemon common])
@@ -78,7 +79,7 @@
 (defn nimbus-data [conf inimbus]
   (let [forced-scheduler (.getForcedScheduler inimbus)]
     {:conf conf
-     :host-port-info (str (.getCanonicalHostName (InetAddress/getLocalHost)) 
":" (conf NIMBUS-THRIFT-PORT))
+     :nimbus-host-port-info (NimbusInfo. (.getCanonicalHostName 
(InetAddress/getLocalHost)) (conf NIMBUS-THRIFT-PORT) false)
      :inimbus inimbus
      :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) 
conf)
      :submitted-count (atom 0)
@@ -1107,8 +1108,7 @@
                                 (dissoc storm-conf 
STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
                 total-storm-conf (merge conf storm-conf)
                 topology (normalize-topology total-storm-conf topology)
-                storm-cluster-state (:storm-cluster-state nimbus)
-                host-port-info (:host-port-info nimbus) ]
+                storm-cluster-state (:storm-cluster-state nimbus)]
             (when credentials (doseq [nimbus-autocred-plugin 
(:nimbus-autocred-plugins nimbus)]
               (.populateCredentials nimbus-autocred-plugin credentials 
(Collections/unmodifiableMap storm-conf))))
             (if (and (conf SUPERVISOR-RUN-WORKER-AS-USER) (or (nil? 
submitter-user) (.isEmpty (.trim submitter-user)))) 
@@ -1125,7 +1125,7 @@
               ;;cred-update-lock is not needed here because creds are being 
added for the first time.
               (.set-credentials! storm-cluster-state storm-id credentials 
storm-conf)
               (setup-storm-code nimbus conf storm-id uploadedJarLocation 
storm-conf topology)
-              (.setup-code-distributor! storm-cluster-state storm-id 
host-port-info)
+              (.setup-code-distributor! storm-cluster-state storm-id 
(:nimbus-host-port-info nimbus))
               (wait-for-desired-code-replication nimbus conf storm-id)
               (.setup-heartbeats! storm-cluster-state storm-id)
               (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE 
:inactive
@@ -1380,7 +1380,6 @@
 (defn download-code [conf nimbus storm-id host port]
   (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid))
         storm-cluster-state (:storm-cluster-state nimbus)
-        host-port-info (:host-port-info nimbus)
         storm-root (master-stormdist-root conf storm-id)
         remote-meta-file-path (master-storm-metafile-path storm-root)
         local-meta-file-path (master-storm-metafile-path tmp-root)]
@@ -1390,7 +1389,7 @@
       (.download (:bt-tracker nimbus) storm-id (File. local-meta-file-path)))
     (if (.exists (File. storm-root)) (FileUtils/forceDelete (File. 
storm-root)))
     (FileUtils/moveDirectory (File. tmp-root) (File. storm-root))
-    (.setup-code-distributor! storm-cluster-state storm-id host-port-info)))
+    (.setup-code-distributor! storm-cluster-state storm-id 
(:nimbus-host-port-info nimbus))))
 
 (defmethod sync-code :distributed [conf nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1405,11 +1404,10 @@
           (let [nimbuses-with-missing (.code-distributor-info 
storm-cluster-state missing)]
             (log-message "trying to download missing topology code from " 
(clojure.string/join "," nimbuses-with-missing))
             (doseq [nimbus-host-port nimbuses-with-missing]
-              (let [[host port] (clojure.string/split nimbus-host-port #":")]
-                (when-not (contains? (code-ids (:conf nimbus)) missing)
-                  (try
-                    (download-code conf nimbus missing host (Integer/parseInt 
port))
-                    (catch Exception e (log-error e "Exception while trying to 
syn-code for missing topology" missing))))))))))
+              (when-not (contains? (code-ids (:conf nimbus)) missing)
+                (try
+                  (download-code conf nimbus missing (.getHost 
nimbus-host-port) (.getPort nimbus-host-port))
+                  (catch Exception e (log-error e "Exception while trying to 
syn-code for missing topology" missing)))))))))
     (.addToLeaderLockQueue (:leader-elector nimbus))
     (log-message "local disk is completely in sync with zk 
code-distributor.")))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d0bb3830/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java 
b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
index 1baa461..e31090f 100644
--- a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
+++ b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
@@ -3,9 +3,11 @@ package backtype.storm.nimbus;
 import java.io.Serializable;
 
 public class NimbusInfo implements Serializable {
+    private static final String DELIM = ":";
+
     private String host;
     private int port;
-    private transient boolean isLeader;
+    private boolean isLeader;
 
     public NimbusInfo(String host, int port, boolean isLeader) {
         this.host = host;
@@ -13,6 +15,19 @@ public class NimbusInfo implements Serializable {
         this.isLeader = isLeader;
     }
 
+    public static NimbusInfo parse(String nimbusInfo) {
+        String[] hostAndPort = nimbusInfo.split(DELIM);
+        if(hostAndPort != null && hostAndPort.length == 2) {
+            return new NimbusInfo(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]), false);
+        } else {
+            throw new RuntimeException("nimbusInfo should have format of 
host:port, invalid string " + nimbusInfo);
+        }
+    }
+
+    public String toHostPortString() {
+        return String.format("%s%s%s",host,DELIM,port);
+    }
+
     public boolean isLeader() {
         return isLeader;
     }

Reply via email to