Repository: storm Updated Branches: refs/heads/nimbus-ha-branch dcee1e29c -> a11fcc303
minor bug fixes. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa69f2af Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa69f2af Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa69f2af Branch: refs/heads/nimbus-ha-branch Commit: fa69f2aff74b7a2485d2e7d07617f028fadeeb12 Parents: 678afed Author: Parth Brahmbhatt <[email protected]> Authored: Mon Dec 15 16:29:51 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Mon Dec 15 16:29:51 2014 -0800 ---------------------------------------------------------------------- .../src/clj/backtype/storm/daemon/nimbus.clj | 27 ++++++++++---------- storm-core/src/clj/backtype/storm/zookeeper.clj | 6 ++--- 2 files changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fa69f2af/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 31237b3..2afcbec 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -1228,7 +1228,6 @@ (let [tmp-root (str (master-tmp-dir conf) file-path-separator (uuid)) storm-cluster-state (:storm-cluster-state nimbus) host-port-info (:host-port-info nimbus) - code-dir (master-stormdist-root conf) storm-root (master-stormdist-root conf storm-id) remote-meta-file-path (master-storm-metafile-path storm-root) local-meta-file-path (master-storm-metafile-path tmp-root)] @@ -1246,18 +1245,20 @@ active-topologies (set (.code-distributor storm-cluster-state nil)) missing-topologies (set/difference active-topologies code-ids)] (if (not (empty? missing-topologies)) - (doseq [missing missing-topologies] - (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.") - (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)] - (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing)) - (doseq [nimbus-host-port nimbuses-with-missing] - (let [[host port] (clojure.string/split nimbus-host-port #":")] - (when-not (contains? (code-ids (:conf nimbus)) missing) - (try - (download-code conf nimbus missing host (Integer/parseInt port)) - (.addToLeaderLockQueue (:leader-elector nimbus)) - (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing)))))))) - (log-message "local disk is completely in sync with zk code-distributor.")))) + (do + (.removeFromLeaderLockQueue (:leader-elector nimbus)) + (doseq [missing missing-topologies] + (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.") + (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)] + (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing)) + (doseq [nimbus-host-port nimbuses-with-missing] + (let [[host port] (clojure.string/split nimbus-host-port #":")] + (when-not (contains? (code-ids (:conf nimbus)) missing) + (try + (download-code conf nimbus missing host (Integer/parseInt port)) + (catch Exception e (log-error e "Exception while trying to syn-code for missing topology" missing)))))))))) + (.addToLeaderLockQueue (:leader-elector nimbus)) + (log-message "local disk is completely in sync with zk code-distributor."))) (defmethod sync-code :local [conf nimbus] nil) http://git-wip-us.apache.org/repos/asf/storm/blob/fa69f2af/storm-core/src/clj/backtype/storm/zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj index 517a34a..a675990 100644 --- a/storm-core/src/clj/backtype/storm/zookeeper.clj +++ b/storm-core/src/clj/backtype/storm/zookeeper.clj @@ -225,7 +225,7 @@ is-leader (.isLeader participant)] (NimbusInfo. server port is-leader))) -(defn leader-latch-listener +(defn leader-latch-listener-impl "Leader latch listener that will be invoked when we either gain or lose leadership" [conf zk leader-latch] (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost)) @@ -255,7 +255,7 @@ leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") id (str (.getCanonicalHostName (InetAddress/getLocalHost)) ":" (conf NIMBUS-THRIFT-PORT)) leader-latch (atom (LeaderLatch. zk leader-lock-path id)) - leader-latch-listener (atom (leader-latch-listener conf zk @leader-latch)) + leader-latch-listener (atom (leader-latch-listener-impl conf zk @leader-latch)) ] (reify ILeaderElector (prepare [this conf] @@ -267,7 +267,7 @@ (if (.equals LeaderLatch$State/CLOSED state) (do (reset! leader-latch (LeaderLatch. zk leader-lock-path id)) - (reset! leader-latch-listener (leader-latch-listener conf zk @leader-latch)) + (reset! leader-latch-listener (leader-latch-listener-impl conf zk @leader-latch)) (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.") ))
