Repository: storm
Updated Branches:
  refs/heads/master 05771bafa -> 879cb5b8f


STORM-2901: Reuse ZK connection for Nimbus


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/348faf94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/348faf94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/348faf94

Branch: refs/heads/master
Commit: 348faf946864b86b1833ff1115d1c5706f638269
Parents: ee0c36d
Author: chenyuzhao <[email protected]>
Authored: Mon Jan 22 18:12:15 2018 +0800
Committer: chenyuzhao <[email protected]>
Committed: Mon Jan 22 19:14:31 2018 +0800

----------------------------------------------------------------------
 .../apache/storm/command/shell_submission.clj   | 12 +++++--
 .../org/apache/storm/command/AdminCommands.java |  7 ----
 .../test/clj/org/apache/storm/nimbus_test.clj   | 12 +++----
 .../apache/storm/security/auth/auth_test.clj    |  2 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |  7 ++--
 .../storm/blobstore/BlobSynchronizer.java       | 18 +++++-----
 .../storm/blobstore/KeySequenceNumber.java      | 24 +++++--------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 38 +++++++++++++++-----
 .../storm/zookeeper/LeaderElectorImp.java       |  3 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   | 23 +++++++-----
 10 files changed, 85 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj 
b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8aee299..3514040 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -14,19 +14,25 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.command.shell-submission
-  (:import [org.apache.storm StormSubmitter]
+  (:import [org.apache.storm Config StormSubmitter]
            [org.apache.storm.utils ServerUtils]
            [org.apache.storm.zookeeper Zookeeper])
   (:use [org.apache.storm util config log])
   (:require [clojure.string :as str])
-  (:import [org.apache.storm.utils ConfigUtils])
+  (:import [org.apache.storm.callback DefaultWatcherCallBack]
+           [org.apache.storm.utils ConfigUtils]
+           [org.apache.storm.zookeeper Zookeeper ClientZookeeper])
   (:gen-class))
 
 
 (defn -main [^String tmpjarpath & args]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
+        servers (.get conf Config/STORM_ZOOKEEPER_SERVERS)
+        port (.get conf Config/STORM_ZOOKEEPER_PORT)
+        root (.get conf Config/STORM_ZOOKEEPER_ROOT)
+        zk (ClientZookeeper/mkClient conf servers port root 
(DefaultWatcherCallBack.) conf)
         ; since this is not a purpose to add to leader lock queue, passing nil 
as blob-store and topo cache is ok
-        zk-leader-elector (Zookeeper/zkLeaderElector conf nil nil)
+        zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java 
b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index a052f8d..7f72a48 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -24,12 +24,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.KeyFilter;
 import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
@@ -38,7 +35,6 @@ import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.zookeeper.ClientZookeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
@@ -75,8 +71,6 @@ public class AdminCommands {
     private static void initialize() {
         conf = Utils.readStormConfig();
         nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, 
NimbusInfo.fromConf(conf));
-        List<String> servers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         List<ACL> acls = null;
         if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
             acls = adminZkAcls();
@@ -87,7 +81,6 @@ public class AdminCommands {
             LOG.error("admin can't create stormClusterState");
             new RuntimeException(e);
         }
-        CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, 
"", new DefaultWatcherCallBack(),conf);
     }
 
     // we might think of moving this method in Utils class

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 42bb788..03b7388 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1311,7 +1311,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
+                      (zkLeaderElectorImpl [conf zk blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1328,7 +1328,7 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. false))))]
+                          (zkLeaderElectorImpl [conf zk blob-store tc] 
(MockLeaderElector. false))))]
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState 
conf nil (ClusterStateContext.)))
@@ -1635,7 +1635,7 @@
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf blob-store tc] nil)))
+                          (zkLeaderElectorImpl [conf zk blob-store tc] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
           (mk-nimbus auth-conf fake-inimbus)
           (.mkStormClusterStateImpl (Mockito/verify cluster-utils 
(Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
@@ -1706,7 +1706,7 @@
   (with-open [zk (InProcessZookeeper. )]
     (with-open [tmp-nimbus-dir (TmpPath.)
                 _ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus-dir (.getPath tmp-nimbus-dir)]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1899,7 +1899,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. 
inactive-topos))
@@ -1944,7 +1944,7 @@
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                    (zkLeaderElectorImpl [conf blob-store tc] 
(MockLeaderElector. ))))]
+                    (zkLeaderElectorImpl [conf zk blob-store tc] 
(MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
         (.set (.getHeartbeatsCache nimbus) hb-cache)
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set 
inactive-topos))

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj 
b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index 2b10ffd..a77849a 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -62,7 +62,7 @@
 
 (defn nimbus-data [storm-conf inimbus]
   (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                  (zkLeaderElectorImpl [conf blob-store tc] (Mockito/mock 
ILeaderElector))))]
+                  (zkLeaderElectorImpl [conf zk blob-store tc] (Mockito/mock 
ILeaderElector))))]
     (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock 
IStormClusterState) nil (Mockito/mock BlobStore) nil nil)))
 
 (defn dummy-service-handler

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index fd1cf40..67b7caf 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -45,8 +45,13 @@ import org.slf4j.LoggerFactory;
 
 public class BlobStoreUtils {
     private static final String BLOBSTORE_SUBTREE="/blobstore";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreUtils.class);
 
+    public static String getBlobStoreSubtree() {
+        return BLOBSTORE_SUBTREE;
+    }
+
     public static CuratorFramework createZKClient(Map<String, Object> conf) {
         @SuppressWarnings("unchecked")
         List<String> zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
@@ -268,6 +273,4 @@ public class BlobStoreUtils {
             throw new RuntimeException(exp);
         }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
index 62233a2..193ccac 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,17 +17,17 @@
  */
 package org.apache.storm.blobstore;
 
+import java.nio.channels.ClosedByInterruptException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.channels.ClosedByInterruptException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Is called periodically and updates the nimbus with blobs based on the state 
stored inside the zookeeper
  * for a non leader nimbus trying to be in sync with the operations performed 
on the leader nimbus.
@@ -58,6 +58,10 @@ public class BlobSynchronizer {
         this.blobStoreKeySet = blobStoreKeySet;
     }
 
+    public void setZkClient(CuratorFramework zkClient) {
+        this.zkClient = zkClient;
+    }
+
     public Set<String> getBlobStoreKeySet() {
         Set<String> keySet = new HashSet<String>();
         keySet.addAll(blobStoreKeySet);
@@ -73,7 +77,6 @@ public class BlobSynchronizer {
     public synchronized void syncBlobs() {
         try {
             LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys 
{}",getBlobStoreKeySet(), getZookeeperKeySet());
-            zkClient = BlobStoreUtils.createZKClient(conf);
             deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), 
getZookeeperKeySet());
             updateKeySetForBlobStore(getBlobStoreKeySet());
             Set<String> keySetToDownload = 
getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
@@ -90,9 +93,6 @@ public class BlobSynchronizer {
                     LOG.debug("Detected deletion for the key {} while 
downloading - skipping download", key);
                 }
             }
-            if (zkClient !=null) {
-                zkClient.close();
-            }
         } catch(InterruptedException | ClosedByInterruptException exp) {
             LOG.error("Interrupt Exception {}", exp);
         } catch(Exception exp) {

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java 
b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
index 4c202be..570e0ad 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,20 +18,20 @@
 
 package org.apache.storm.blobstore;
 
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.TreeSet;
-import java.util.Map;
-import java.util.List;
-
 /**
  * Class hands over the key sequence number which implies the number of 
updates made to a blob.
  * The information regarding the keys and the sequence number which represents 
the number of updates are
@@ -119,7 +119,6 @@ import java.util.List;
  */
 public class KeySequenceNumber {
     private static final Logger LOG = 
LoggerFactory.getLogger(KeySequenceNumber.class);
-    private final String BLOBSTORE_SUBTREE="/blobstore";
     private final String 
BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
     private final String key;
     private final NimbusInfo nimbusInfo;
@@ -131,12 +130,11 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public synchronized int getKeySequenceNumber(Map<String, Object> conf) 
throws KeyNotFoundException {
+    public synchronized int getKeySequenceNumber(CuratorFramework zkClient) 
throws KeyNotFoundException {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
-        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {
             // Key has not been created yet and it is the first time it is 
being created
-            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) 
== null) {
+            if 
(zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + 
key) == null) {
                 
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                         
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
 + "/" + key);
                 zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE 
+ "/" + key,
@@ -147,7 +145,7 @@ public class KeySequenceNumber {
             // When all nimbodes go down and one or few of them come up
             // Unfortunately there might not be an exact way to know which one 
contains the most updated blob,
             // if all go down which is unlikely. Hence there might be a need 
to update the blob if all go down.
-            List<String> stateInfoList = 
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+            List<String> stateInfoList = 
zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + 
key);
             LOG.debug("stateInfoList-size {} stateInfoList-data {}", 
stateInfoList.size(), stateInfoList);
             if (stateInfoList.isEmpty()) {
                 return getMaxSequenceNumber(zkClient);
@@ -207,10 +205,6 @@ public class KeySequenceNumber {
             // in other case, just set this to 0 to trigger re-sync later
             LOG.error("Exception {}", e);
             return INITIAL_SEQUENCE_NUMBER - 1;
-        } finally {
-            if (zkClient != null) {
-                zkClient.close();
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e66dbe5..afb1c28 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -53,6 +53,8 @@ import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.security.auth.Subject;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
@@ -64,6 +66,7 @@ import org.apache.storm.blobstore.BlobSynchronizer;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.KeySequenceNumber;
 import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
@@ -177,6 +180,7 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Utils.UptimeComputer;
 import org.apache.storm.utils.VersionInfo;
 import org.apache.storm.validation.ConfigValidation;
+import org.apache.storm.zookeeper.ClientZookeeper;
 import org.apache.storm.zookeeper.Zookeeper;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.ZooDefs;
@@ -570,9 +574,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         return ret;
     }
     
-    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, 
Map<String, Object> conf) throws KeyNotFoundException {
+    private static int getVersionForKey(String key, NimbusInfo nimbusInfo,
+        CuratorFramework zkClient) throws KeyNotFoundException {
         KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
-        return kseq.getKeySequenceNumber(conf);
+        return kseq.getKeySequenceNumber(zkClient);
     }
     
     private static StormTopology readStormTopology(String topoId, TopoCache 
tc) throws KeyNotFoundException, AuthorizationException,
@@ -1023,6 +1028,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private IAuthorizer authorizationHandler;
     private final IAuthorizer impersonationAuthorizationHandler;
     private final AtomicLong submittedCount;
+    //Cached CuratorFramework, mainly used for BlobStore.
+    private CuratorFramework zkClient;
     private final IStormClusterState stormClusterState;
     private final Object submitLock = new Object();
     private final Object schedLock = new Object();
@@ -1058,6 +1065,17 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private final List<ClusterMetricsConsumerExecutor> 
clusterConsumerExceutors;
     private final IGroupMappingServiceProvider groupMapper;
     private final IPrincipalToLocal principalToLocal;
+
+    private static CuratorFramework makeZKClient(Map<String, Object> conf) {
+        List<String> servers = 
(List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
+        String root = (String)conf.get(Config.STORM_ZOOKEEPER_ROOT);
+        CuratorFramework ret = null;
+        if (servers != null && port != null) {
+            ret = ClientZookeeper.mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack(), conf);
+        }
+        return ret;
+    }
     
     private static IStormClusterState makeStormClusterState(Map<String, 
Object> conf) throws Exception {
         List<ACL> acls = null;
@@ -1118,8 +1136,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         });
         this.underlyingScheduler = makeScheduler(conf, inimbus);
         this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler);
+        this.zkClient = makeZKClient(conf);
         if (leaderElector == null) {
-            leaderElector = Zookeeper.zkLeaderElector(conf, blobStore, 
topoCache);
+            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, 
blobStore, topoCache);
         }
         this.leaderElector = leaderElector;
         this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
@@ -1269,18 +1288,18 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 store.createBlob(jarKey, fin, new 
SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
             }
             if (store instanceof LocalFsBlobStore) {
-                clusterState.setupBlobstore(jarKey, hostPortInfo, 
getVersionForKey(jarKey, hostPortInfo, conf));
+                clusterState.setupBlobstore(jarKey, hostPortInfo, 
getVersionForKey(jarKey, hostPortInfo, zkClient));
             }
         }
 
         topoCache.addTopoConf(topoId, subject, topoConf);
         if (store instanceof LocalFsBlobStore) {
-            clusterState.setupBlobstore(confKey, hostPortInfo, 
getVersionForKey(confKey, hostPortInfo, conf));
+            clusterState.setupBlobstore(confKey, hostPortInfo, 
getVersionForKey(confKey, hostPortInfo, zkClient));
         }
 
         topoCache.addTopology(topoId, subject, topology);
         if (store instanceof LocalFsBlobStore) {
-            clusterState.setupBlobstore(codeKey, hostPortInfo, 
getVersionForKey(codeKey, hostPortInfo, conf));
+            clusterState.setupBlobstore(codeKey, hostPortInfo, 
getVersionForKey(codeKey, hostPortInfo, zkClient));
         }
     }
 
@@ -2153,7 +2172,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         LOG.debug("Creating list of key entries for blobstore inside zookeeper 
{} local {}", activeKeys, activeLocalKeys);
         for (String key: activeLocalKeys) {
             try {
-                state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, 
nimbusInfo, conf));
+                state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, 
nimbusInfo, zkClient));
             } catch (KeyNotFoundException e) {
                 // invalid key, remove it from blobstore
                 store.deleteBlob(key, NIMBUS_SUBJECT);
@@ -3350,7 +3369,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             BlobStore store = blobStore;
             NimbusInfo ni = nimbusHostPortInfo;
             if (store instanceof LocalFsBlobStore) {
-                state.setupBlobstore(key, ni, getVersionForKey(key, ni, conf));
+                state.setupBlobstore(key, ni, getVersionForKey(key, ni, 
zkClient));
             }
             LOG.debug("Created state in zookeeper {} {} {}", state, store, ni);
         } catch (Exception e) {
@@ -4153,6 +4172,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             if (actionNotifier != null) {
                 actionNotifier.cleanup();
             }
+            if (zkClient != null) {
+                zkClient.close();
+            }
             LOG.info("Shut down master");
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java 
b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 6bf39c3..5dfdf0b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ 
b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -121,7 +121,6 @@ public class LeaderElectorImp implements ILeaderElector {
 
     @Override
     public void close() {
-        LOG.info("closing zookeeper connection of leader elector.");
-        zk.close();
+        //Do nothing now.
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java 
b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index 802d3ba..8f46dbe 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -138,7 +138,7 @@ public class Zookeeper {
             @Override
             public void isLeader() {
                 Set<String> activeTopologyIds = new 
TreeSet<>(ClientZookeeper.getChildren(zk,
-                    conf.get(Config.STORM_ZOOKEEPER_ROOT) + 
ClusterUtils.STORMS_SUBTREE, false));
+                    ClusterUtils.STORMS_SUBTREE, false));
 
                 Set<String> activeTopologyBlobKeys = 
populateTopologyBlobKeys(activeTopologyIds);
                 Set<String> activeTopologyCodeKeys = 
filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -250,17 +250,24 @@ public class Zookeeper {
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, 
BlobStore blobStore, final TopoCache tc)
-        throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf, blobStore, tc);
+    /**
+     * Get master leader elector.
+     * @param conf Config.
+     * @param zkClient ZkClient, the client must have a default 
Config.STORM_ZOOKEEPER_ROOT as root path.
+     * @param blobStore {@link BlobStore}
+     * @param tc {@link TopoCache}
+     * @return Instance of {@link ILeaderElector}
+     * @throws UnknownHostException
+     */
+    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, 
CuratorFramework zkClient,
+        BlobStore blobStore, final TopoCache tc) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, 
BlobStore blobStore, final TopoCache tc)
+    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, 
CuratorFramework zk, BlobStore blobStore, final TopoCache tc)
         throws UnknownHostException {
         List<String> servers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, 
"", new DefaultWatcherCallBack(), conf);
-        String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + 
"/leader-lock";
+        String leaderLockPath = "/leader-lock";
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new 
AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> 
leaderLatchListenerAtomicReference =

Reply via email to