Repository: storm
Updated Branches:
  refs/heads/nimbus-ha-branch dcee1e29c -> a11fcc303


minor bug fixes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa69f2af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa69f2af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa69f2af

Branch: refs/heads/nimbus-ha-branch
Commit: fa69f2aff74b7a2485d2e7d07617f028fadeeb12
Parents: 678afed
Author: Parth Brahmbhatt <[email protected]>
Authored: Mon Dec 15 16:29:51 2014 -0800
Committer: Parth Brahmbhatt <[email protected]>
Committed: Mon Dec 15 16:29:51 2014 -0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 27 ++++++++++----------
 storm-core/src/clj/backtype/storm/zookeeper.clj |  6 ++---
 2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa69f2af/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj 
b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 31237b3..2afcbec 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1228,7 +1228,6 @@
   (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid))
         storm-cluster-state (:storm-cluster-state nimbus)
         host-port-info (:host-port-info nimbus)
-        code-dir (master-stormdist-root conf)
         storm-root (master-stormdist-root conf storm-id)
         remote-meta-file-path (master-storm-metafile-path storm-root)
         local-meta-file-path (master-storm-metafile-path tmp-root)]
@@ -1246,18 +1245,20 @@
         active-topologies (set (.code-distributor storm-cluster-state nil))
         missing-topologies (set/difference active-topologies code-ids)]
     (if (not (empty? missing-topologies))
-      (doseq [missing missing-topologies]
-        (log-message "missing topology " missing " has state on zookeeper but 
doesn't have a local dir on this host.")
-        (let [nimbuses-with-missing (.code-distributor-info 
storm-cluster-state missing)]
-          (log-message "trying to download missing topology code from " 
(clojure.string/join "," nimbuses-with-missing))
-          (doseq [nimbus-host-port nimbuses-with-missing]
-            (let [[host port] (clojure.string/split nimbus-host-port #":")]
-              (when-not (contains? (code-ids (:conf nimbus)) missing)
-                (try
-                  (download-code conf nimbus missing host (Integer/parseInt 
port))
-                  (.addToLeaderLockQueue (:leader-elector nimbus))
-                  (catch Exception e (log-error e "Exception while trying to 
syn-code for missing topology" missing))))))))
-      (log-message "local disk is completely in sync with zk 
code-distributor."))))
+      (do
+        (.removeFromLeaderLockQueue (:leader-elector nimbus))
+        (doseq [missing missing-topologies]
+          (log-message "missing topology " missing " has state on zookeeper 
but doesn't have a local dir on this host.")
+          (let [nimbuses-with-missing (.code-distributor-info 
storm-cluster-state missing)]
+            (log-message "trying to download missing topology code from " 
(clojure.string/join "," nimbuses-with-missing))
+            (doseq [nimbus-host-port nimbuses-with-missing]
+              (let [[host port] (clojure.string/split nimbus-host-port #":")]
+                (when-not (contains? (code-ids (:conf nimbus)) missing)
+                  (try
+                    (download-code conf nimbus missing host (Integer/parseInt 
port))
+                    (catch Exception e (log-error e "Exception while trying to 
syn-code for missing topology" missing))))))))))
+    (.addToLeaderLockQueue (:leader-elector nimbus))
+    (log-message "local disk is completely in sync with zk 
code-distributor.")))
 
 (defmethod sync-code :local [conf nimbus]
   nil)

http://git-wip-us.apache.org/repos/asf/storm/blob/fa69f2af/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj 
b/storm-core/src/clj/backtype/storm/zookeeper.clj
index 517a34a..a675990 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -225,7 +225,7 @@
      is-leader (.isLeader participant)]
     (NimbusInfo. server port is-leader)))
 
-(defn leader-latch-listener
+(defn leader-latch-listener-impl
   "Leader latch listener that will be invoked when we either gain or lose 
leadership"
   [conf zk leader-latch]
   (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
@@ -255,7 +255,7 @@
         leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
         id (str (.getCanonicalHostName (InetAddress/getLocalHost)) ":" (conf 
NIMBUS-THRIFT-PORT))
         leader-latch (atom (LeaderLatch. zk leader-lock-path id))
-        leader-latch-listener (atom (leader-latch-listener conf zk 
@leader-latch))
+        leader-latch-listener (atom (leader-latch-listener-impl conf zk 
@leader-latch))
         ]
     (reify ILeaderElector
       (prepare [this conf]
@@ -267,7 +267,7 @@
         (if (.equals LeaderLatch$State/CLOSED state)
           (do
             (reset! leader-latch (LeaderLatch. zk leader-lock-path id))
-            (reset! leader-latch-listener (leader-latch-listener conf zk 
@leader-latch))
+            (reset! leader-latch-listener (leader-latch-listener-impl conf zk 
@leader-latch))
             (log-message "LeaderLatch was in closed state. Resetted the 
leaderLatch and listeners.")
             ))
 

Reply via email to