This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 899e04e3b2eb101efa671e699b168fd0e4357880
Author: Hunter Lee <[email protected]>
AuthorDate: Wed Feb 5 11:12:50 2020 -0800

    Add MetadataStoreDirectory and ZkMetadataStoreDirectory (#720)
    
    MetadataStoreDirectory is an object that provides Metadata Store Directory 
APIs (routing APIs, CRUD of routing data, etc.). Helix REST will use this 
object to serve Metadata Store Directory Service REST endpoints.
    Also, it will make appropriate changes to the ZK access layer to listen on 
changes on the routing data.
    
    Changelist:
    1. Refactor AbstractTestClass to make multi-ZK setup work
    2. Add implementation of MetadataStoreDirectory
    3. Add TestZkMetadataStoreDirectory
---
 .../rest/metadatastore/MetadataStoreDirectory.java | 119 ++++++++++
 .../rest/metadatastore/RoutingDataListener.java    |  27 +++
 .../metadatastore/ZkMetadataStoreDirectory.java    | 193 +++++++++++++++++
 .../rest/metadatastore/ZkRoutingDataReader.java    | 143 ++++++++++--
 .../constant/MetadataStoreRoutingConstants.java    |  27 +++
 .../TestZkMetadataStoreDirectory.java              | 240 +++++++++++++++++++++
 .../metadatastore/TestZkRoutingDataReader.java     |  52 ++---
 .../helix/rest/server/AbstractTestClass.java       | 158 +++++++-------
 8 files changed, 843 insertions(+), 116 deletions(-)

diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java
new file mode 100644
index 0000000..032362a
--- /dev/null
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java
@@ -0,0 +1,119 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+
+/**
+ * MetadataStoreDirectory interface that provides methods that are used to 
route requests to appropriate metadata store realm.
+ *
+ * namespace: tied to a namespace used in Helix REST (Metadata Store Directory 
Service endpoints will be served by Helix REST deployables)
+ * realm: a metadata store deployable/ensemble. for example, if an application 
wishes to use 3 ZK quorums, then each ZK quorum would be considered a realm (ZK 
realm)
+ * metadata store path sharding key: assuming the metadata store uses a file 
system APIs, this sharding key denotes the key that maps to a particular 
metadata store realm. an example of a key is a cluster name mapping to a 
particular ZK realm (ZK address)
+ */
+public interface MetadataStoreDirectory extends AutoCloseable {
+
+  /**
+   * Retrieves all existing namespaces in the routing metadata store.
+   * @return
+   */
+  Collection<String> getAllNamespaces();
+
+  /**
+   * Returns all metadata store realms in the given namespace.
+   * @return
+   */
+  Collection<String> getAllMetadataStoreRealms(String namespace);
+
+  /**
+   * Returns all path-based sharding keys in the given namespace.
+   * @return
+   */
+  Collection<String> getAllShardingKeys(String namespace);
+
+  /**
+   * Returns all path-based sharding keys in the given namespace and the realm.
+   * @param namespace
+   * @param realm
+   * @return
+   */
+  Collection<String> getAllShardingKeysInRealm(String namespace, String realm);
+
+  /**
+   * Returns all sharding keys that have the given path as the prefix 
substring.
+   * E.g) Given that there are sharding keys: /a/b/c, /a/b/d, /a/e,
+   * getAllShardingKeysUnderPath(namespace, "/a/b") returns ["/a/b/c": 
"realm", "/a/b/d": "realm].
+   * @param namespace
+   * @param path
+   * @return
+   */
+  Map<String, String> getAllMappingUnderPath(String namespace, String path);
+
+  /**
+   * Returns the name of the metadata store realm based on the namespace and 
the sharding key given.
+   * @param namespace
+   * @param shardingKey
+   * @return
+   */
+  String getMetadataStoreRealm(String namespace, String shardingKey)
+      throws NoSuchElementException;
+
+  /**
+   * Creates a realm. If the namespace does not exist, it creates one.
+   * @param namespace
+   * @param realm
+   * @return true if successful or if the realm already exists. false 
otherwise.
+   */
+  boolean addMetadataStoreRealm(String namespace, String realm);
+
+  /**
+   * Deletes a realm.
+   * @param namespace
+   * @param realm
+   * @return true if successful or the realm or namespace does not exist. 
false otherwise.
+   */
+  boolean deleteMetadataStoreRealm(String namespace, String realm);
+
+  /**
+   * Creates a mapping between the sharding key to the realm in the given 
namespace.
+   * @param namespace
+   * @param realm
+   * @param shardingKey
+   * @return false if failed
+   */
+  boolean addShardingKey(String namespace, String realm, String shardingKey);
+
+  /**
+   * Deletes the mapping between the sharding key to the realm in the given 
namespace.
+   * @param namespace
+   * @param realm
+   * @param shardingKey
+   * @return false if failed; true if the deletion is successful or the key 
does not exist.
+   */
+  boolean deleteShardingKey(String namespace, String realm, String 
shardingKey);
+
+  /**
+   * Close MetadataStoreDirectory.
+   */
+  void close();
+}
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java
new file mode 100644
index 0000000..a44fc17
--- /dev/null
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java
@@ -0,0 +1,27 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface RoutingDataListener {
+  /**
+   * Callback for updating the internally-cached routing data.
+   */
+  void refreshRoutingData(String namespace);
+}
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
new file mode 100644
index 0000000..85f8f4a
--- /dev/null
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -0,0 +1,193 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient.ZkClientConfig;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import 
org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ZK-based MetadataStoreDirectory that listens on the routing data in routing 
ZKs with a update callback.
+ */
+public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, 
RoutingDataListener {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkMetadataStoreDirectory.class);
+
+  // TODO: enable the line below when implementation is complete
+  // The following maps' keys represent the namespace
+  private final Map<String, MetadataStoreRoutingDataReader> 
_routingDataReaderMap;
+  private final Map<String, MetadataStoreRoutingData> _routingDataMap;
+  private final Map<String, String> _routingZkAddressMap;
+  // <namespace, <realm, <list of sharding keys>> mappings
+  private final Map<String, Map<String, List<String>>> _realmToShardingKeysMap;
+
+  /**
+   * Creates a ZkMetadataStoreDirectory based on the given routing ZK 
addresses.
+   * @param routingZkAddressMap (namespace, routing ZK connect string)
+   * @throws InvalidRoutingDataException
+   */
+  public ZkMetadataStoreDirectory(Map<String, String> routingZkAddressMap)
+      throws InvalidRoutingDataException {
+    if (routingZkAddressMap == null || routingZkAddressMap.isEmpty()) {
+      throw new InvalidRoutingDataException("Routing ZK Addresses given are 
invalid!");
+    }
+    _routingDataReaderMap = new HashMap<>();
+    _routingZkAddressMap = routingZkAddressMap;
+    _realmToShardingKeysMap = new ConcurrentHashMap<>();
+    _routingDataMap = new ConcurrentHashMap<>();
+
+    // Create RoutingDataReaders
+    for (Map.Entry<String, String> routingEntry : 
_routingZkAddressMap.entrySet()) {
+      _routingDataReaderMap.put(routingEntry.getKey(),
+          new ZkRoutingDataReader(routingEntry.getKey(), 
routingEntry.getValue(), this));
+
+      // Populate realmToShardingKeys with ZkRoutingDataReader
+      _realmToShardingKeysMap.put(routingEntry.getKey(),
+          _routingDataReaderMap.get(routingEntry.getKey()).getRoutingData());
+    }
+  }
+
+  @Override
+  public Collection<String> getAllNamespaces() {
+    return Collections.unmodifiableCollection(_routingZkAddressMap.keySet());
+  }
+
+  @Override
+  public Collection<String> getAllMetadataStoreRealms(String namespace) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not 
exist!");
+    }
+    return 
Collections.unmodifiableCollection(_realmToShardingKeysMap.get(namespace).keySet());
+  }
+
+  @Override
+  public Collection<String> getAllShardingKeys(String namespace) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not 
exist!");
+    }
+    Set<String> allShardingKeys = new HashSet<>();
+    _realmToShardingKeysMap.get(namespace).values().forEach(keys -> 
allShardingKeys.addAll(keys));
+    return allShardingKeys;
+  }
+
+  @Override
+  public Collection<String> getAllShardingKeysInRealm(String namespace, String 
realm) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not 
exist!");
+    }
+    if (!_realmToShardingKeysMap.get(namespace).containsKey(realm)) {
+      throw new NoSuchElementException(
+          "Realm " + realm + " does not exist in namespace " + namespace);
+    }
+    return 
Collections.unmodifiableCollection(_realmToShardingKeysMap.get(namespace).get(realm));
+  }
+
+  @Override
+  public Map<String, String> getAllMappingUnderPath(String namespace, String 
path) {
+    // TODO: get it from routingData
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getMetadataStoreRealm(String namespace, String shardingKey) {
+    // TODO: get it from routingData
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addMetadataStoreRealm(String namespace, String realm) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean deleteMetadataStoreRealm(String namespace, String realm) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addShardingKey(String namespace, String realm, String 
shardingKey) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean deleteShardingKey(String namespace, String realm, String 
shardingKey) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Callback for updating the cached routing data.
+   * Note: this method should not synchronize on the class or the map. We do 
not want namespaces blocking each other.
+   * Threadsafe map is used for _realmToShardingKeysMap.
+   * The global consistency of the in-memory routing data is not a requirement 
(eventual consistency is enough).
+   * @param namespace
+   */
+  @Override
+  public void refreshRoutingData(String namespace) {
+    // Safe to ignore the callback if routingDataMap is null.
+    // If routingDataMap is null, then it will be populated by the constructor 
anyway
+    // If routingDataMap is not null, then it's safe for the callback function 
to update it
+
+    // Check if namespace exists; otherwise, return as a NOP and log it
+    if (!_routingZkAddressMap.containsKey(namespace)) {
+      LOG.error("Failed to refresh internally-cached routing data! Namespace 
not found: " + namespace);
+    }
+
+    try {
+      _realmToShardingKeysMap.put(namespace, 
_routingDataReaderMap.get(namespace).getRoutingData());
+    } catch (InvalidRoutingDataException e) {
+      LOG.error("Failed to get routing data for namespace: " + namespace + 
"!");
+    }
+
+    if (_routingDataMap != null) {
+      MetadataStoreRoutingData newRoutingData =
+          new TrieRoutingData(new TrieRoutingData.TrieNode(null, null, false, 
null));
+      // TODO call constructRoutingData() here.
+      _routingDataMap.put(namespace, newRoutingData);
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    
_routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
+  }
+}
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
index a4c7e1c..453180f 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
@@ -22,54 +22,163 @@ package org.apache.helix.rest.metadatastore;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import 
org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
 import 
org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.zookeeper.Watcher;
 
-public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader {
-  static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA";
-  static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
 
+public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, 
IZkDataListener, IZkChildListener, IZkStateListener {
+  private final String _namespace;
   private final String _zkAddress;
   private final HelixZkClient _zkClient;
+  private final RoutingDataListener _routingDataListener;
+
+  public ZkRoutingDataReader(String namespace, String zkAddress) {
+    this(namespace, zkAddress, null);
+  }
 
-  public ZkRoutingDataReader(String zkAddress) {
+  public ZkRoutingDataReader(String namespace, String zkAddress,
+      RoutingDataListener routingDataListener) {
+    if (namespace == null || namespace.isEmpty()) {
+      throw new IllegalArgumentException("namespace cannot be null or empty!");
+    }
+    _namespace = namespace;
+    if (zkAddress == null || zkAddress.isEmpty()) {
+      throw new IllegalArgumentException("Zk address cannot be null or 
empty!");
+    }
     _zkAddress = zkAddress;
-    _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
-        new HelixZkClient.ZkConnectionConfig(zkAddress),
-        new HelixZkClient.ZkClientConfig().setZkSerializer(new 
ZNRecordSerializer()));
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new 
ZNRecordSerializer()));
+    _routingDataListener = routingDataListener;
+    if (_routingDataListener != null) {
+      // Subscribe child changes
+      
_zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH,
 this);
+      // Subscribe data changes
+      for (String child : 
_zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+        _zkClient
+            
.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
child,
+                this);
+      }
+    }
   }
 
-  public Map<String, List<String>> getRoutingData() throws 
InvalidRoutingDataException {
+  /**
+   * Returns (realm, list of ZK path sharding keys) mappings.
+   * @return
+   * @throws InvalidRoutingDataException
+   */
+  public Map<String, List<String>> getRoutingData()
+      throws InvalidRoutingDataException {
     Map<String, List<String>> routingData = new HashMap<>();
     List<String> children;
     try {
-      children = _zkClient.getChildren(ROUTING_DATA_PATH);
+      children = 
_zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
     } catch (ZkNoNodeException e) {
-      throw new InvalidRoutingDataException("Routing data directory ZNode " + 
ROUTING_DATA_PATH
-          + " does not exist. Routing ZooKeeper address: " + _zkAddress);
+      throw new InvalidRoutingDataException(
+          "Routing data directory ZNode " + 
MetadataStoreRoutingConstants.ROUTING_DATA_PATH
+              + " does not exist. Routing ZooKeeper address: " + _zkAddress);
     }
     if (children == null || children.isEmpty()) {
       throw new InvalidRoutingDataException(
           "There are no metadata store realms defined. Routing ZooKeeper 
address: " + _zkAddress);
     }
     for (String child : children) {
-      ZNRecord record = _zkClient.readData(ROUTING_DATA_PATH + "/" + child);
-      List<String> shardingKeys = record.getListField(ZNRECORD_LIST_FIELD_KEY);
+      ZNRecord record =
+          _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/" + child);
+      List<String> shardingKeys =
+          
record.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY);
       if (shardingKeys == null || shardingKeys.isEmpty()) {
-        throw new InvalidRoutingDataException("Realm address ZNode " + 
ROUTING_DATA_PATH + "/"
-            + child + " does not have a value for key " + 
ZNRECORD_LIST_FIELD_KEY
-            + ". Routing ZooKeeper address: " + _zkAddress);
+        throw new InvalidRoutingDataException(
+            "Realm address ZNode " + 
MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child
+                + " does not have a value for key "
+                + MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY
+                + ". Routing ZooKeeper address: " + _zkAddress);
       }
       routingData.put(child, shardingKeys);
     }
     return routingData;
   }
 
-  public void close() {
+  public synchronized void close() {
+    _zkClient.unsubscribeAll();
     _zkClient.close();
   }
+
+  @Override
+  public synchronized void handleDataChange(String s, Object o)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleDataDeleted(String s)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+
+    // Renew subscription
+    
_zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH,
 this);
+    for (String child : 
_zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+      
_zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH 
+ "/" + child,
+          this);
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleChildChange(String s, List<String> list)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+
+    // Subscribe data changes again because some children might have been 
deleted or added
+    _zkClient.unsubscribeAll();
+    for (String child : 
_zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+      
_zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH 
+ "/" + child,
+          this);
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleStateChanged(Watcher.Event.KeeperState state)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleNewSession(String sessionId)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleSessionEstablishmentError(Throwable error)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
 }
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
new file mode 100644
index 0000000..fda355b
--- /dev/null
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
@@ -0,0 +1,27 @@
+package org.apache.helix.rest.metadatastore.constant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class MetadataStoreRoutingConstants {
+  public static final String ROUTING_DATA_PATH = 
"/METADATA_STORE_ROUTING_DATA";
+
+  // For ZK only
+  public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
+}
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
new file mode 100644
index 0000000..7b0a4f0
--- /dev/null
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
@@ -0,0 +1,240 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import 
org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import 
org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkMetadataStoreDirectory extends AbstractTestClass {
+  /**
+   * The following are constants to be used for testing.
+   */
+  private static final String TEST_REALM_1 = "testRealm1";
+  private static final List<String> TEST_SHARDING_KEYS_1 =
+      Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", 
"/sharding/key/1/c");
+  private static final String TEST_REALM_2 = "testRealm2";
+  private static final List<String> TEST_SHARDING_KEYS_2 =
+      Arrays.asList("/sharding/key/1/d", "/sharding/key/1/e", 
"/sharding/key/1/f");
+  private static final String TEST_REALM_3 = "testRealm3";
+  private static final List<String> TEST_SHARDING_KEYS_3 =
+      Arrays.asList("/sharding/key/1/x", "/sharding/key/1/y", 
"/sharding/key/1/z");
+
+  // List of all ZK addresses, each of which corresponds to a 
namespace/routing ZK
+  private List<String> _zkList;
+  // <Namespace, ZkAddr> mapping
+  private Map<String, String> _routingZkAddrMap;
+  private MetadataStoreDirectory _metadataStoreDirectory;
+
+  @BeforeClass
+  public void beforeClass()
+      throws InvalidRoutingDataException {
+    _zkList = new ArrayList<>(ZK_SERVER_MAP.keySet());
+
+    // Populate routingZkAddrMap
+    _routingZkAddrMap = new LinkedHashMap<>();
+    int namespaceIndex = 0;
+    String namespacePrefix = "namespace_";
+    for (String zk : _zkList) {
+      _routingZkAddrMap.put(namespacePrefix + namespaceIndex, zk);
+    }
+
+    // Write dummy mappings in ZK
+    // Create a node that represents a realm address and add 3 sharding keys 
to it
+    ZNRecord znRecord = new ZNRecord("RoutingInfo");
+
+    _zkList.forEach(zk -> {
+      ZK_SERVER_MAP.get(zk).getZkClient().setZkSerializer(new 
ZNRecordSerializer());
+      // Write first realm and sharding keys pair
+      
znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_1);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/" + TEST_REALM_1,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_1,
+              znRecord);
+
+      // Create another realm and sharding keys pair
+      
znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_2);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/" + TEST_REALM_2,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_2,
+              znRecord);
+    });
+
+    // Create metadataStoreDirectory
+    _metadataStoreDirectory = new ZkMetadataStoreDirectory(_routingZkAddrMap);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _metadataStoreDirectory.close();
+    _zkList.forEach(zk -> ZK_SERVER_MAP.get(zk).getZkClient()
+        .deleteRecursive(MetadataStoreRoutingConstants.ROUTING_DATA_PATH));
+  }
+
+  @Test
+  public void testGetAllNamespaces() {
+    Assert.assertEquals(_metadataStoreDirectory.getAllNamespaces(), 
_routingZkAddrMap.keySet());
+  }
+
+  @Test(dependsOnMethods = "testGetAllNamespaces")
+  public void testGetAllMetadataStoreRealms() {
+    Set<String> realms = new HashSet<>();
+    realms.add(TEST_REALM_1);
+    realms.add(TEST_REALM_2);
+
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      
Assert.assertEquals(_metadataStoreDirectory.getAllMetadataStoreRealms(namespace),
 realms);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllMetadataStoreRealms")
+  public void testGetAllShardingKeys() {
+    Set<String> allShardingKeys = new HashSet<>();
+    allShardingKeys.addAll(TEST_SHARDING_KEYS_1);
+    allShardingKeys.addAll(TEST_SHARDING_KEYS_2);
+
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      
Assert.assertEquals(_metadataStoreDirectory.getAllShardingKeys(namespace), 
allShardingKeys);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllShardingKeys")
+  public void testGetAllShardingKeysInRealm() {
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      // Test two realms independently
+      Assert
+          
.assertEquals(_metadataStoreDirectory.getAllShardingKeysInRealm(namespace, 
TEST_REALM_1),
+              TEST_SHARDING_KEYS_1);
+      Assert
+          
.assertEquals(_metadataStoreDirectory.getAllShardingKeysInRealm(namespace, 
TEST_REALM_2),
+              TEST_SHARDING_KEYS_2);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllShardingKeysInRealm")
+  public void testDataChangeCallback()
+      throws Exception {
+    // For all namespaces (Routing ZKs), add an extra sharding key to 
TEST_REALM_1
+    String newKey = "/a/b/c/d/e";
+    _zkList.forEach(zk -> {
+      ZNRecord znRecord = ZK_SERVER_MAP.get(zk).getZkClient()
+          .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_1);
+      
znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).add(newKey);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_1,
+              znRecord);
+    });
+
+    // Verify that the sharding keys field have been updated
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return 
_metadataStoreDirectory.getAllShardingKeys(namespace).contains(newKey)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, 
TEST_REALM_1)
+              .contains(newKey);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test(dependsOnMethods = "testDataChangeCallback")
+  public void testChildChangeCallback()
+      throws Exception {
+    // For all namespaces (Routing ZKs), add a realm with a sharding key list
+    _zkList.forEach(zk -> {
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/" + TEST_REALM_3,
+              true);
+      ZNRecord znRecord = new ZNRecord("RoutingInfo");
+      
znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_3);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_3,
+              znRecord);
+    });
+
+    // Verify that the new realm and sharding keys have been updated in-memory 
via callback
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return 
_metadataStoreDirectory.getAllMetadataStoreRealms(namespace).contains(TEST_REALM_3)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, 
TEST_REALM_3)
+              .containsAll(TEST_SHARDING_KEYS_3);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+
+    // Since there was a child change callback, make sure data change works on 
the new child (realm) as well by adding a key
+    // This tests removing all subscriptions and subscribing with new children 
list
+    // For all namespaces (Routing ZKs), add an extra sharding key to 
TEST_REALM_3
+    String newKey = "/a/b/c/d/e";
+    _zkList.forEach(zk -> {
+      ZNRecord znRecord = ZK_SERVER_MAP.get(zk).getZkClient()
+          .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_3);
+      
znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).add(newKey);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + 
TEST_REALM_3,
+              znRecord);
+    });
+
+    // Verify that the sharding keys field have been updated
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return 
_metadataStoreDirectory.getAllShardingKeys(namespace).contains(newKey)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, 
TEST_REALM_3)
+              .contains(newKey);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+  }
+}
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
index d06c38d..4479f68 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
@@ -23,8 +23,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
+import 
org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
 import 
org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
 import org.apache.helix.rest.server.AbstractTestClass;
 import org.testng.Assert;
@@ -33,12 +35,14 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 public class TestZkRoutingDataReader extends AbstractTestClass {
+  private static final String DUMMY_NAMESPACE = "NAMESPACE";
   private MetadataStoreRoutingDataReader _zkRoutingDataReader;
 
   @BeforeClass
   public void beforeClass() {
-    _zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
+    _zkRoutingDataReader = new ZkRoutingDataReader(DUMMY_NAMESPACE, ZK_ADDR, 
null);
   }
 
   @AfterClass
@@ -48,7 +52,7 @@ public class TestZkRoutingDataReader extends 
AbstractTestClass {
 
   @AfterMethod
   public void afterMethod() {
-    _baseAccessor.remove(ZkRoutingDataReader.ROUTING_DATA_PATH, 
AccessOption.PERSISTENT);
+    _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, 
AccessOption.PERSISTENT);
   }
 
   @Test
@@ -57,23 +61,24 @@ public class TestZkRoutingDataReader extends 
AbstractTestClass {
     ZNRecord testZnRecord1 = new ZNRecord("testZnRecord1");
     List<String> testShardingKeys1 =
         Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", 
"/sharding/key/1/c");
-    testZnRecord1.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY, 
testShardingKeys1);
+    testZnRecord1
+        .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, 
testShardingKeys1);
 
     // Create another node that represents a realm address and add 3 sharding 
keys to it
     ZNRecord testZnRecord2 = new ZNRecord("testZnRecord2");
-    List<String> testShardingKeys2 = Arrays.asList("/sharding/key/2/a", 
"/sharding/key/2/b",
-        "/sharding/key/2/c", "/sharding/key/2/d");
-    testZnRecord2.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY, 
testShardingKeys2);
+    List<String> testShardingKeys2 = Arrays
+        .asList("/sharding/key/2/a", "/sharding/key/2/b", "/sharding/key/2/c", 
"/sharding/key/2/d");
+    testZnRecord2
+        .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, 
testShardingKeys2);
 
     // Add both nodes as children nodes to 
ZkRoutingDataReader.ROUTING_DATA_PATH
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + 
"/testRealmAddress1",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/testRealmAddress1",
         testZnRecord1, AccessOption.PERSISTENT);
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + 
"/testRealmAddress2",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/testRealmAddress2",
         testZnRecord2, AccessOption.PERSISTENT);
 
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new 
ZkRoutingDataReader(ZK_ADDR);
     try {
-      Map<String, List<String>> routingData = 
zkRoutingDataReader.getRoutingData();
+      Map<String, List<String>> routingData = 
_zkRoutingDataReader.getRoutingData();
       Assert.assertEquals(routingData.size(), 2);
       Assert.assertEquals(routingData.get("testRealmAddress1"), 
testShardingKeys1);
       Assert.assertEquals(routingData.get("testRealmAddress2"), 
testShardingKeys2);
@@ -84,24 +89,22 @@ public class TestZkRoutingDataReader extends 
AbstractTestClass {
 
   @Test
   public void testGetRoutingDataMissingMSRD() {
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new 
ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Routing data directory ZNode " + 
ZkRoutingDataReader.ROUTING_DATA_PATH
+      Assert.assertTrue(e.getMessage().contains(
+          "Routing data directory ZNode " + 
MetadataStoreRoutingConstants.ROUTING_DATA_PATH
               + " does not exist. Routing ZooKeeper address: " + ZK_ADDR));
     }
   }
 
   @Test
   public void testGetRoutingDataMissingMSRDChildren() {
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH, new 
ZNRecord("test"),
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, new 
ZNRecord("test"),
         AccessOption.PERSISTENT);
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new 
ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -112,20 +115,19 @@ public class TestZkRoutingDataReader extends 
AbstractTestClass {
   @Test
   public void testGetRoutingDataMSRDChildEmptyValue() {
     ZNRecord testZnRecord1 = new ZNRecord("testZnRecord1");
-    testZnRecord1.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY,
+    
testZnRecord1.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
         Collections.emptyList());
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + 
"/testRealmAddress1",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + 
"/testRealmAddress1",
         testZnRecord1, AccessOption.PERSISTENT);
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new 
ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Realm address ZNode " + 
ZkRoutingDataReader.ROUTING_DATA_PATH
+      Assert.assertTrue(e.getMessage().contains(
+          "Realm address ZNode " + 
MetadataStoreRoutingConstants.ROUTING_DATA_PATH
               + "/testRealmAddress1 does not have a value for key "
-              + ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY + ". Routing 
ZooKeeper address: "
-              + ZK_ADDR));
+              + MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY
+              + ". Routing ZooKeeper address: " + ZK_ADDR));
     }
   }
 }
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java 
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 0302758..e6ecb82 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -66,8 +66,6 @@ import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
@@ -92,13 +90,19 @@ import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+
 public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
+  /**
+   * Constants for multi-ZK environment.
+   */
   private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
   private static final String NUM_ZK_PROPERTY_KEY = "numZk";
-  private static final String ZK_PREFIX = "localhost:";
-  private static final int ZK_START_PORT = 2123;
-  protected Map<String, ZkServer> _zkServerMap;
+  protected static final String ZK_PREFIX = "localhost:";
+  protected static final int ZK_START_PORT = 2123;
+  // The following map must be a static map because it needs to be shared 
throughout tests
+  protected static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
 
+  // For a single-ZK/Helix environment
   protected static final String ZK_ADDR = "localhost:2123";
   protected static final String WORKFLOW_PREFIX = "Workflow_";
   protected static final String JOB_PREFIX = "Job_";
@@ -154,58 +158,19 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
 
   @Override
   protected Application configure() {
-    // start zk
-    _zkServerMap = new HashMap<>();
-    try {
-      if (_zkServer == null) {
-        _zkServer = TestHelper.startZkServer(ZK_ADDR);
-        Assert.assertNotNull(_zkServer);
-        _zkServerMap.put(ZK_ADDR, _zkServer);
-        ZKClientPool.reset();
-      }
-
-      if (_zkServerTestNS == null) {
-        _zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS);
-        Assert.assertNotNull(_zkServerTestNS);
-        _zkServerMap.put(_zkAddrTestNS, _zkServerTestNS);
-        ZKClientPool.reset();
-      }
-    } catch (Exception e) {
-      Assert.fail(String.format("Failed to start ZK server: %s", 
e.toString()));
-    }
-
-    // Start additional ZKs in a multi-ZK setup
-    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
-    if (multiZkConfig != null && 
multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
-      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
-      if (numZkFromConfig != null) {
-        try {
-          int numZkFromConfigInt = Integer.parseInt(numZkFromConfig);
-          // Start (numZkFromConfigInt - 2) ZooKeepers
-          for (int i = 2; i < numZkFromConfigInt; i++) {
-            String zkAddr = ZK_PREFIX + (ZK_START_PORT + i);
-            ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-            Assert.assertNotNull(zkServer);
-            _zkServerMap.put(zkAddr, zkServer);
-          }
-        } catch (Exception e) {
-          Assert.fail("Failed to create multiple ZooKeepers!");
-        }
-      }
-    }
-
     // Configure server context
     ResourceConfig resourceConfig = new ResourceConfig();
     resourceConfig.packages(AbstractResource.class.getPackage().getName());
     ServerContext serverContext = new ServerContext(ZK_ADDR);
     resourceConfig.property(ContextPropertyKeys.SERVER_CONTEXT.name(), 
serverContext);
-    resourceConfig.register(new AuditLogFilter(Arrays.<AuditLogger>asList(new 
MockAuditLogger())));
+    resourceConfig.register(new AuditLogFilter(Collections.singletonList(new 
MockAuditLogger())));
 
     return resourceConfig;
   }
 
   @Override
-  protected TestContainerFactory getTestContainerFactory() throws 
TestContainerException {
+  protected TestContainerFactory getTestContainerFactory()
+      throws TestContainerException {
     return new TestContainerFactory() {
       @Override
       public TestContainer create(final URI baseUri, DeploymentContext 
deploymentContext) {
@@ -234,7 +199,7 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
               try {
                 _helixRestServer =
                     new HelixRestServer(namespaces, baseUri.getPort(), 
baseUri.getPath(),
-                        Arrays.<AuditLogger>asList(_auditLogger));
+                        Collections.singletonList(_auditLogger));
                 _helixRestServer.start();
               } catch (Exception ex) {
                 throw new TestContainerException(ex);
@@ -251,8 +216,11 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
   }
 
   @BeforeSuite
-  public void beforeSuite() throws Exception {
+  public void beforeSuite()
+      throws Exception {
     if (!_init) {
+      setupZooKeepers();
+
       // TODO: use logging.properties file to config java.util.logging.Logger 
levels
       java.util.logging.Logger topJavaLogger = 
java.util.logging.Logger.getLogger("");
       topJavaLogger.setLevel(Level.WARNING);
@@ -260,12 +228,12 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
       HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
 
       clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _gZkClient = DedicatedZkClientFactory
-          .getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
+      _gZkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), 
clientConfig);
 
       clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _gZkClientTestNS = DedicatedZkClientFactory
-          .getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);
+      _gZkClientTestNS = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), 
clientConfig);
 
       _gSetupTool = new ClusterSetup(_gZkClient);
       _configAccessor = new ConfigAccessor(_gZkClient);
@@ -274,14 +242,14 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
 
       // wait for the web service to start
       Thread.sleep(100);
-
-      setup();
+      setupHelixResources();
       _init = true;
     }
   }
 
   @AfterSuite
-  public void afterSuite() throws Exception {
+  public void afterSuite()
+      throws Exception {
     // tear down orphan-ed threads
     for (ClusterControllerManager cm : _clusterControllerManagers) {
       if (cm != null && cm.isConnected()) {
@@ -289,7 +257,7 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
       }
     }
 
-    for (MockParticipantManager mm: _mockParticipantManagers) {
+    for (MockParticipantManager mm : _mockParticipantManagers) {
       if (mm != null && mm.isConnected()) {
         mm.syncStop();
       }
@@ -315,16 +283,57 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
       _zkServerTestNS = null;
     }
 
-    // Stop all ZkServers
-    _zkServerMap.forEach((zkAddr, zkServer) -> 
TestHelper.stopZkServer(zkServer));
-
     if (_helixRestServer != null) {
       _helixRestServer.shutdown();
       _helixRestServer = null;
     }
+
+    // Stop all ZkServers
+    ZK_SERVER_MAP.forEach((zkAddr, zkServer) -> 
TestHelper.stopZkServer(zkServer));
+  }
+
+  private void setupZooKeepers() {
+    // start zk
+    try {
+      if (_zkServer == null) {
+        _zkServer = TestHelper.startZkServer(ZK_ADDR);
+        Assert.assertNotNull(_zkServer);
+        ZK_SERVER_MAP.put(ZK_ADDR, _zkServer);
+        ZKClientPool.reset();
+      }
+
+      if (_zkServerTestNS == null) {
+        _zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS);
+        Assert.assertNotNull(_zkServerTestNS);
+        ZK_SERVER_MAP.put(_zkAddrTestNS, _zkServerTestNS);
+        ZKClientPool.reset();
+      }
+    } catch (Exception e) {
+      Assert.fail(String.format("Failed to start ZK servers: %s", 
e.toString()));
+    }
+
+    // Start additional ZKs in a multi-ZK setup if applicable
+    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
+    if (multiZkConfig != null && 
multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
+      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
+      if (numZkFromConfig != null) {
+        try {
+          int numZkFromConfigInt = Integer.parseInt(numZkFromConfig);
+          // Start (numZkFromConfigInt - 2) ZooKeepers
+          for (int i = 2; i < numZkFromConfigInt; i++) {
+            String zkAddr = ZK_PREFIX + (ZK_START_PORT + i);
+            ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+            Assert.assertNotNull(zkServer);
+            ZK_SERVER_MAP.put(zkAddr, zkServer);
+          }
+        } catch (Exception e) {
+          Assert.fail("Failed to create multiple ZooKeepers!");
+        }
+      }
+    }
   }
 
-  protected void setup() throws Exception {
+  protected void setupHelixResources() {
     _clusters = createClusters(3);
     _gSetupTool.addCluster(_superCluster, true);
     _gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
@@ -347,7 +356,7 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, 
STOPPABLE_INSTANCES);
   }
 
-  protected Set<String> createInstances(String cluster, int numInstances) 
throws Exception {
+  protected Set<String> createInstances(String cluster, int numInstances) {
     Set<String> instances = new HashSet<>();
     for (int i = 0; i < numInstances; i++) {
       String instanceName = cluster + "localhost_" + (12918 + i);
@@ -362,7 +371,8 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     for (int i = 0; i < numResources; i++) {
       String resource = cluster + "_db_" + i;
       _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, 
"MasterSlave");
-      IdealState idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      IdealState idealState =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
       idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
       _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, 
resource, idealState);
       _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
@@ -390,12 +400,8 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     int i = 0;
     for (String instance : instances) {
       MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
cluster, instance);
-      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, 
TaskFactory>();
-      taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-        @Override public Task createNewTask(TaskCallbackContext context) {
-          return new MockTask(context);
-        }
-      });
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+      taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
       StateMachineEngine stateMachineEngine = 
participant.getStateMachineEngine();
       stateMachineEngine.registerStateModelFactory("Task",
           new TaskStateModelFactory(participant, taskFactoryReg));
@@ -431,8 +437,9 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
 
   protected Map<String, Workflow> createWorkflows(String cluster, int 
numWorkflows) {
     Map<String, Workflow> workflows = new HashMap<>();
-    HelixPropertyStore<ZNRecord> propertyStore = new 
ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
-        PropertyPathBuilder.propertyStore(cluster), null);
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) 
_baseAccessor,
+            PropertyPathBuilder.propertyStore(cluster), null);
 
     for (int i = 0; i < numWorkflows; i++) {
       Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i);
@@ -489,11 +496,13 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     return jobCfgs;
   }
 
-  protected static ZNRecord toZNRecord(String data) throws IOException {
+  protected static ZNRecord toZNRecord(String data)
+      throws IOException {
     return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
   }
 
-  protected String get(String uri, Map<String, String> queryParams, int 
expectedReturnStatus, boolean expectBodyReturned) {
+  protected String get(String uri, Map<String, String> queryParams, int 
expectedReturnStatus,
+      boolean expectBodyReturned) {
     WebTarget webTarget = target(uri);
     if (queryParams != null) {
       for (Map.Entry<String, String> entry : queryParams.entrySet()) {
@@ -552,7 +561,8 @@ public class AbstractTestClass extends 
JerseyTestNg.ContainerPerClassTest {
     return new TaskDriver(_gZkClient, clusterName);
   }
 
-  private void preSetupForParallelInstancesStoppableTest(String clusterName, 
List<String> instances) {
+  private void preSetupForParallelInstancesStoppableTest(String clusterName,
+      List<String> instances) {
     _gSetupTool.addCluster(clusterName, true);
     ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
     clusterConfig.setFaultZoneType("helixZoneId");

Reply via email to