Repository: storm Updated Branches: refs/heads/master 05771bafa -> 879cb5b8f
STORM-2901: Reuse ZK connection for Nimbus Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/348faf94 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/348faf94 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/348faf94 Branch: refs/heads/master Commit: 348faf946864b86b1833ff1115d1c5706f638269 Parents: ee0c36d Author: chenyuzhao <[email protected]> Authored: Mon Jan 22 18:12:15 2018 +0800 Committer: chenyuzhao <[email protected]> Committed: Mon Jan 22 19:14:31 2018 +0800 ---------------------------------------------------------------------- .../apache/storm/command/shell_submission.clj | 12 +++++-- .../org/apache/storm/command/AdminCommands.java | 7 ---- .../test/clj/org/apache/storm/nimbus_test.clj | 12 +++---- .../apache/storm/security/auth/auth_test.clj | 2 +- .../apache/storm/blobstore/BlobStoreUtils.java | 7 ++-- .../storm/blobstore/BlobSynchronizer.java | 18 +++++----- .../storm/blobstore/KeySequenceNumber.java | 24 +++++-------- .../org/apache/storm/daemon/nimbus/Nimbus.java | 38 +++++++++++++++----- .../storm/zookeeper/LeaderElectorImp.java | 3 +- .../org/apache/storm/zookeeper/Zookeeper.java | 23 +++++++----- 10 files changed, 85 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 8aee299..3514040 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -14,19 +14,25 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.shell-submission - (:import [org.apache.storm StormSubmitter] + (:import [org.apache.storm Config StormSubmitter] [org.apache.storm.utils ServerUtils] [org.apache.storm.zookeeper Zookeeper]) (:use [org.apache.storm util config log]) (:require [clojure.string :as str]) - (:import [org.apache.storm.utils ConfigUtils]) + (:import [org.apache.storm.callback DefaultWatcherCallBack] + [org.apache.storm.utils ConfigUtils] + [org.apache.storm.zookeeper Zookeeper ClientZookeeper]) (:gen-class)) (defn -main [^String tmpjarpath & args] (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) + servers (.get conf Config/STORM_ZOOKEEPER_SERVERS) + port (.get conf Config/STORM_ZOOKEEPER_PORT) + root (.get conf Config/STORM_ZOOKEEPER_ROOT) + zk (ClientZookeeper/mkClient conf servers port root (DefaultWatcherCallBack.) conf) ; since this is not a purpose to add to leader lock queue, passing nil as blob-store and topo cache is ok - zk-leader-elector (Zookeeper/zkLeaderElector conf nil nil) + zk-leader-elector (Zookeeper/zkLeaderElector conf zk nil nil) leader-nimbus (.getLeader zk-leader-elector) host (.getHost leader-nimbus) port (.getPort leader-nimbus) http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index a052f8d..7f72a48 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -24,12 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.curator.framework.CuratorFramework; -import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.KeyFilter; import org.apache.storm.blobstore.LocalFsBlobStore; -import org.apache.storm.callback.DefaultWatcherCallBack; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; @@ -38,7 +35,6 @@ import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; -import org.apache.storm.zookeeper.ClientZookeeper; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; @@ -75,8 +71,6 @@ public class AdminCommands { private static void initialize() { conf = Utils.readStormConfig(); nimbusBlobStore = ServerUtils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf)); - List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); List<ACL> acls = null; if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { acls = adminZkAcls(); @@ -87,7 +81,6 @@ public class AdminCommands { LOG.error("admin can't create stormClusterState"); new RuntimeException(e); } - CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf); } // we might think of moving this method in Utils class http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 42bb788..03b7388 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -1311,7 +1311,7 @@ (with-open [zk (InProcessZookeeper. )] (with-open [tmp-nimbus-dir (TmpPath.) _ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))] (let [nimbus-dir (.getPath tmp-nimbus-dir)] (letlocals (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) @@ -1328,7 +1328,7 @@ {})) (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. false))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. false))))] (letlocals (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) @@ -1635,7 +1635,7 @@ _ (UtilsInstaller. fake-utils) - (StormCommonInstaller. fake-common) zk-le (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] nil))) + (zkLeaderElectorImpl [conf zk blob-store tc] nil))) mocked-cluster (MockedCluster. cluster-utils)] (mk-nimbus auth-conf fake-inimbus) (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any)) @@ -1706,7 +1706,7 @@ (with-open [zk (InProcessZookeeper. )] (with-open [tmp-nimbus-dir (TmpPath.) _ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))] (let [nimbus-dir (.getPath tmp-nimbus-dir)] (letlocals (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) @@ -1899,7 +1899,7 @@ mock-blob-store (Mockito/mock BlobStore) conf {NIMBUS-MONITOR-FREQ-SECS 10}] (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))] (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] (.set (.getHeartbeatsCache nimbus) hb-cache) (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos)) @@ -1944,7 +1944,7 @@ mock-blob-store (Mockito/mock BlobStore) conf {NIMBUS-MONITOR-FREQ-SECS 10}] (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (MockLeaderElector. ))))] (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] (.set (.getHeartbeatsCache nimbus) hb-cache) (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos)) http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj index 2b10ffd..a77849a 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj @@ -62,7 +62,7 @@ (defn nimbus-data [storm-conf inimbus] (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf blob-store tc] (Mockito/mock ILeaderElector))))] + (zkLeaderElectorImpl [conf zk blob-store tc] (Mockito/mock ILeaderElector))))] (org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil))) (defn dummy-service-handler http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java index fd1cf40..67b7caf 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java @@ -45,8 +45,13 @@ import org.slf4j.LoggerFactory; public class BlobStoreUtils { private static final String BLOBSTORE_SUBTREE="/blobstore"; + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class); + public static String getBlobStoreSubtree() { + return BLOBSTORE_SUBTREE; + } + public static CuratorFramework createZKClient(Map<String, Object> conf) { @SuppressWarnings("unchecked") List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); @@ -268,6 +273,4 @@ public class BlobStoreUtils { throw new RuntimeException(exp); } } - - } http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java index 62233a2..193ccac 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java @@ -17,17 +17,17 @@ */ package org.apache.storm.blobstore; +import java.nio.channels.ClosedByInterruptException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.channels.ClosedByInterruptException; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - /** * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper * for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus. @@ -58,6 +58,10 @@ public class BlobSynchronizer { this.blobStoreKeySet = blobStoreKeySet; } + public void setZkClient(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + public Set<String> getBlobStoreKeySet() { Set<String> keySet = new HashSet<String>(); keySet.addAll(blobStoreKeySet); @@ -73,7 +77,6 @@ public class BlobSynchronizer { public synchronized void syncBlobs() { try { LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); - zkClient = BlobStoreUtils.createZKClient(conf); deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); updateKeySetForBlobStore(getBlobStoreKeySet()); Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); @@ -90,9 +93,6 @@ public class BlobSynchronizer { LOG.debug("Detected deletion for the key {} while downloading - skipping download", key); } } - if (zkClient !=null) { - zkClient.close(); - } } catch(InterruptedException | ClosedByInterruptException exp) { LOG.error("Interrupt Exception {}", exp); } catch(Exception exp) { http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java index 4c202be..570e0ad 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java @@ -18,20 +18,20 @@ package org.apache.storm.blobstore; +import java.nio.ByteBuffer; +import java.util.TreeSet; +import java.util.List; +import java.util.Map; + +import org.apache.curator.framework.CuratorFramework; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; -import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.TreeSet; -import java.util.Map; -import java.util.List; - /** * Class hands over the key sequence number which implies the number of updates made to a blob. * The information regarding the keys and the sequence number which represents the number of updates are @@ -119,7 +119,6 @@ import java.util.List; */ public class KeySequenceNumber { private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class); - private final String BLOBSTORE_SUBTREE="/blobstore"; private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber"; private final String key; private final NimbusInfo nimbusInfo; @@ -131,12 +130,11 @@ public class KeySequenceNumber { this.nimbusInfo = nimbusInfo; } - public synchronized int getKeySequenceNumber(Map<String, Object> conf) throws KeyNotFoundException { + public synchronized int getKeySequenceNumber(CuratorFramework zkClient) throws KeyNotFoundException { TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>(); - CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf); try { // Key has not been created yet and it is the first time it is being created - if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) { + if (zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key) == null) { zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, @@ -147,7 +145,7 @@ public class KeySequenceNumber { // When all nimbodes go down and one or few of them come up // Unfortunately there might not be an exact way to know which one contains the most updated blob, // if all go down which is unlikely. Hence there might be a need to update the blob if all go down. - List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key); + List<String> stateInfoList = zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key); LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList); if (stateInfoList.isEmpty()) { return getMaxSequenceNumber(zkClient); @@ -207,10 +205,6 @@ public class KeySequenceNumber { // in other case, just set this to 0 to trigger re-sync later LOG.error("Exception {}", e); return INITIAL_SEQUENCE_NUMBER - 1; - } finally { - if (zkClient != null) { - zkClient.close(); - } } } http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index e66dbe5..afb1c28 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -53,6 +53,8 @@ import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.security.auth.Subject; + +import org.apache.curator.framework.CuratorFramework; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.DaemonConfig; @@ -64,6 +66,7 @@ import org.apache.storm.blobstore.BlobSynchronizer; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.blobstore.KeySequenceNumber; import org.apache.storm.blobstore.LocalFsBlobStore; +import org.apache.storm.callback.DefaultWatcherCallBack; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; @@ -177,6 +180,7 @@ import org.apache.storm.utils.Utils; import org.apache.storm.utils.Utils.UptimeComputer; import org.apache.storm.utils.VersionInfo; import org.apache.storm.validation.ConfigValidation; +import org.apache.storm.zookeeper.ClientZookeeper; import org.apache.storm.zookeeper.Zookeeper; import org.apache.thrift.TException; import org.apache.zookeeper.ZooDefs; @@ -570,9 +574,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return ret; } - private static int getVersionForKey(String key, NimbusInfo nimbusInfo, Map<String, Object> conf) throws KeyNotFoundException { + private static int getVersionForKey(String key, NimbusInfo nimbusInfo, + CuratorFramework zkClient) throws KeyNotFoundException { KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo); - return kseq.getKeySequenceNumber(conf); + return kseq.getKeySequenceNumber(zkClient); } private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, @@ -1023,6 +1028,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private IAuthorizer authorizationHandler; private final IAuthorizer impersonationAuthorizationHandler; private final AtomicLong submittedCount; + //Cached CuratorFramework, mainly used for BlobStore. + private CuratorFramework zkClient; private final IStormClusterState stormClusterState; private final Object submitLock = new Object(); private final Object schedLock = new Object(); @@ -1058,6 +1065,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final List<ClusterMetricsConsumerExecutor> clusterConsumerExceutors; private final IGroupMappingServiceProvider groupMapper; private final IPrincipalToLocal principalToLocal; + + private static CuratorFramework makeZKClient(Map<String, Object> conf) { + List<String> servers = (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + String root = (String)conf.get(Config.STORM_ZOOKEEPER_ROOT); + CuratorFramework ret = null; + if (servers != null && port != null) { + ret = ClientZookeeper.mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), conf); + } + return ret; + } private static IStormClusterState makeStormClusterState(Map<String, Object> conf) throws Exception { List<ACL> acls = null; @@ -1118,8 +1136,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { }); this.underlyingScheduler = makeScheduler(conf, inimbus); this.scheduler = wrapAsBlacklistScheduler(conf, underlyingScheduler); + this.zkClient = makeZKClient(conf); if (leaderElector == null) { - leaderElector = Zookeeper.zkLeaderElector(conf, blobStore, topoCache); + leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache); } this.leaderElector = leaderElector; this.idToSchedStatus = new AtomicReference<>(new HashMap<>()); @@ -1269,18 +1288,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { store.createBlob(jarKey, fin, new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject); } if (store instanceof LocalFsBlobStore) { - clusterState.setupBlobstore(jarKey, hostPortInfo, getVersionForKey(jarKey, hostPortInfo, conf)); + clusterState.setupBlobstore(jarKey, hostPortInfo, getVersionForKey(jarKey, hostPortInfo, zkClient)); } } topoCache.addTopoConf(topoId, subject, topoConf); if (store instanceof LocalFsBlobStore) { - clusterState.setupBlobstore(confKey, hostPortInfo, getVersionForKey(confKey, hostPortInfo, conf)); + clusterState.setupBlobstore(confKey, hostPortInfo, getVersionForKey(confKey, hostPortInfo, zkClient)); } topoCache.addTopology(topoId, subject, topology); if (store instanceof LocalFsBlobStore) { - clusterState.setupBlobstore(codeKey, hostPortInfo, getVersionForKey(codeKey, hostPortInfo, conf)); + clusterState.setupBlobstore(codeKey, hostPortInfo, getVersionForKey(codeKey, hostPortInfo, zkClient)); } } @@ -2153,7 +2172,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { LOG.debug("Creating list of key entries for blobstore inside zookeeper {} local {}", activeKeys, activeLocalKeys); for (String key: activeLocalKeys) { try { - state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, nimbusInfo, conf)); + state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, nimbusInfo, zkClient)); } catch (KeyNotFoundException e) { // invalid key, remove it from blobstore store.deleteBlob(key, NIMBUS_SUBJECT); @@ -3350,7 +3369,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { BlobStore store = blobStore; NimbusInfo ni = nimbusHostPortInfo; if (store instanceof LocalFsBlobStore) { - state.setupBlobstore(key, ni, getVersionForKey(key, ni, conf)); + state.setupBlobstore(key, ni, getVersionForKey(key, ni, zkClient)); } LOG.debug("Created state in zookeeper {} {} {}", state, store, ni); } catch (Exception e) { @@ -4153,6 +4172,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { if (actionNotifier != null) { actionNotifier.cleanup(); } + if (zkClient != null) { + zkClient.close(); + } LOG.info("Shut down master"); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java index 6bf39c3..5dfdf0b 100644 --- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java +++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java @@ -121,7 +121,6 @@ public class LeaderElectorImp implements ILeaderElector { @Override public void close() { - LOG.info("closing zookeeper connection of leader elector."); - zk.close(); + //Do nothing now. } } http://git-wip-us.apache.org/repos/asf/storm/blob/348faf94/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java index 802d3ba..8f46dbe 100644 --- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java @@ -138,7 +138,7 @@ public class Zookeeper { @Override public void isLeader() { Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk, - conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false)); + ClusterUtils.STORMS_SUBTREE, false)); Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds); Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys); @@ -250,17 +250,24 @@ public class Zookeeper { }; } - public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc) - throws UnknownHostException { - return _instance.zkLeaderElectorImpl(conf, blobStore, tc); + /** + * Get master leader elector. + * @param conf Config. + * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path. + * @param blobStore {@link BlobStore} + * @param tc {@link TopoCache} + * @return Instance of {@link ILeaderElector} + * @throws UnknownHostException + */ + public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, + BlobStore blobStore, final TopoCache tc) throws UnknownHostException { + return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc); } - protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc) + protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore, final TopoCache tc) throws UnknownHostException { List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), conf); - String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; + String leaderLockPath = "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
