Repository: helix
Updated Branches:
  refs/heads/master 281f5d1ec -> 7bb55742e


http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index b57ca87..9c18d09 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -57,6 +57,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -70,8 +71,8 @@ public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
   private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
-  protected final IZkConnection _connection;
-  protected final long _operationRetryTimeoutInMillis;
+  private final IZkConnection _connection;
+  private final long _operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener =
       new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> 
_dataListener =
@@ -87,7 +88,6 @@ public class ZkClient implements Watcher {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
 
-
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -130,7 +130,6 @@ public class ZkClient implements Watcher {
     }
   }
 
-
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long 
operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -140,6 +139,7 @@ public class ZkClient implements Watcher {
     _connection = zkConnection;
     _pathBasedZkSerializer = zkSerializer;
     _operationRetryTimeoutInMillis = operationRetryTimeout;
+
     connect(connectionTimeout, this);
 
     // initiate monitor
@@ -510,7 +510,7 @@ public class ZkClient implements Watcher {
       String actualPath = retryUntilConnected(new Callable<String>() {
         @Override
         public String call() throws Exception {
-          return _connection.create(path, data, acl, mode);
+          return getConnection().create(path, data, acl, mode);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -695,7 +695,7 @@ public class ZkClient implements Watcher {
       List<String> children = retryUntilConnected(new Callable<List<String>>() 
{
         @Override
         public List<String> call() throws Exception {
-          return _connection.getChildren(path, watch);
+          return getConnection().getChildren(path, watch);
         }
       });
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -737,7 +737,7 @@ public class ZkClient implements Watcher {
       boolean exists = retryUntilConnected(new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
-          return _connection.exists(path, watch);
+          return getConnection().exists(path, watch);
         }
       });
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -759,7 +759,7 @@ public class ZkClient implements Watcher {
       Stat stat = retryUntilConnected(new Callable<Stat>() {
         @Override
         public Stat call() throws Exception {
-          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, 
false);
+          Stat stat = ((ZkConnection) 
getConnection()).getZookeeper().exists(path, false);
           return stat;
         }
       });
@@ -776,14 +776,14 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void processStateChanged(WatchedEvent event) {
+  protected void processStateChanged(WatchedEvent event) {
     LOG.info("zookeeper state changed (" + event.getState() + ")");
     setCurrentState(event.getState());
     if (getShutdownTrigger()) {
       return;
     }
     fireStateChangedEvent(event.getState());
-    if (event.getState() == KeeperState.Expired) {
+    if (isManagingZkConnection() && event.getState() == KeeperState.Expired) {
       reconnectOnExpiring();
     }
   }
@@ -794,7 +794,7 @@ public class ZkClient implements Watcher {
         new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
 
     Exception reconnectException = new ZkException("Shutdown triggered.");
-    while (!_closed) {
+    while (!isClosed()) {
       try {
         reconnect();
         fireNewSessionEvents();
@@ -820,6 +820,19 @@ public class ZkClient implements Watcher {
     fireSessionEstablishmentError(reconnectException);
   }
 
+  private void reconnect() {
+    getEventLock().lock();
+    try {
+      IZkConnection connection = getConnection();
+      connection.close();
+      connection.connect(this);
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
   private void fireNewSessionEvents() {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("New session event sent to " + 
stateListener) {
@@ -831,7 +844,7 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void fireStateChangedEvent(final KeeperState state) {
+  protected void fireStateChangedEvent(final KeeperState state) {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("State changed to " + state + " sent to " 
+ stateListener) {
 
@@ -1073,7 +1086,7 @@ public class ZkClient implements Watcher {
     }
     final long operationStartTime = System.currentTimeMillis();
     while (true) {
-      if (_closed) {
+      if (isClosed()) {
         throw new IllegalStateException("ZkClient already closed!");
       }
       try {
@@ -1147,7 +1160,7 @@ public class ZkClient implements Watcher {
 
           @Override
           public Object call() throws Exception {
-            _connection.delete(path);
+            getConnection().delete(path);
             return null;
           }
         });
@@ -1226,7 +1239,7 @@ public class ZkClient implements Watcher {
       data = retryUntilConnected(new Callable<byte[]>() {
 
         @Override public byte[] call() throws Exception {
-          return _connection.readData(path, stat, watch);
+          return getConnection().readData(path, stat, watch);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.READ);
@@ -1299,7 +1312,7 @@ public class ZkClient implements Watcher {
       checkDataSizeLimit(data);
       final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
         @Override public Object call() throws Exception {
-          return _connection.writeDataReturnStat(path, data, expectedVersion);
+          return getConnection().writeDataReturnStat(path, data, 
expectedVersion);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -1326,7 +1339,7 @@ public class ZkClient implements Watcher {
     final byte[] data = (datat == null ? null : serialize(datat, path));
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().create(path, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        ((ZkConnection) getConnection()).getZookeeper().create(path, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
             // Arrays.asList(DEFAULT_ACL),
             mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
@@ -1342,7 +1355,7 @@ public class ZkClient implements Watcher {
     final byte[] data = serialize(datat, path);
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().setData(path, data, 
version, cb,
+        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, 
version, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
         return null;
@@ -1354,7 +1367,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().getData(path, null, cb,
+        ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
         return null;
       }
@@ -1365,7 +1378,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().exists(path, null, cb,
+        ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
         return null;
       }
@@ -1376,7 +1389,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb,
+        ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
false));
         return null;
       }
@@ -1394,7 +1407,7 @@ public class ZkClient implements Watcher {
   public void watchForData(final String path) {
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        _connection.exists(path, true);
+        getConnection().exists(path, true);
         return null;
       }
     });
@@ -1433,7 +1446,7 @@ public class ZkClient implements Watcher {
   public void addAuthInfo(final String scheme, final byte[] auth) {
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        _connection.addAuthInfo(scheme, auth);
+        getConnection().addAuthInfo(scheme, auth);
         return null;
       }
     });
@@ -1453,22 +1466,34 @@ public class ZkClient implements Watcher {
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException 
{
-    if (_closed) {
+    if (isClosed()) {
       throw new IllegalStateException("ZkClient already closed!");
     }
     boolean started = false;
     acquireEventLock();
     try {
       setShutdownTrigger(false);
-      _eventThread = new ZkEventThread(_connection.getServers());
+
+      IZkConnection zkConnection = getConnection();
+      _eventThread = new ZkEventThread(zkConnection.getServers());
       _eventThread.start();
-      _connection.connect(watcher);
 
-      LOG.debug("Awaiting connection to Zookeeper server");
-      if (!waitUntilConnected(maxMsToWaitUntilConnected, 
TimeUnit.MILLISECONDS)) {
-        throw new ZkTimeoutException(
-            "Unable to connect to zookeeper server within timeout: " + 
maxMsToWaitUntilConnected);
+      if (isManagingZkConnection()) {
+        zkConnection.connect(watcher);
+        LOG.debug("Awaiting connection to Zookeeper server");
+        if (!waitUntilConnected(maxMsToWaitUntilConnected, 
TimeUnit.MILLISECONDS)) {
+          throw new ZkTimeoutException(
+              "Unable to connect to zookeeper server within timeout: " + 
maxMsToWaitUntilConnected);
+        }
+      } else {
+        // if the client is not managing connection, the input connection is 
supposed to connect.
+        if (isConnectionClosed()) {
+          throw new HelixException(
+              "Unable to connect to zookeeper server with the specified 
ZkConnection");
+        }
+        setCurrentState(KeeperState.SyncConnected);
       }
+
       started = true;
     } finally {
       getEventLock().unlock();
@@ -1484,7 +1509,7 @@ public class ZkClient implements Watcher {
   public long getCreationTime(String path) {
     acquireEventLock();
     try {
-      return _connection.getCreateTime(path);
+      return getConnection().getCreateTime(path);
     } catch (KeeperException e) {
       throw ZkException.create(e);
     } catch (InterruptedException e) {
@@ -1495,7 +1520,7 @@ public class ZkClient implements Watcher {
   }
 
   public String getServers() {
-    return _connection.getServers();
+    return getConnection().getServers();
   }
 
   /**
@@ -1509,15 +1534,18 @@ public class ZkClient implements Watcher {
       LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
     }
     getEventLock().lock();
+    IZkConnection connection = getConnection();
     try {
-      if (_connection == null || _closed) {
+      if (connection == null || _closed) {
         return;
       }
-      LOG.info("Closing zkclient: " + ((ZkConnection) 
_connection).getZookeeper());
       setShutdownTrigger(true);
       _eventThread.interrupt();
       _eventThread.join(2000);
-      _connection.close();
+      if (isManagingZkConnection()) {
+        LOG.info("Closing zkclient: " + ((ZkConnection) 
connection).getZookeeper());
+        connection.close();
+      }
       _closed = true;
 
       // send state change notification to unlock any wait
@@ -1529,7 +1557,7 @@ public class ZkClient implements Watcher {
        * Workaround for HELIX-264: calling ZkClient#close() in its own 
eventThread context will
        * throw ZkInterruptedException and skip ZkConnection#close()
        */
-      if (_connection != null) {
+      if (connection != null) {
         try {
           /**
            * ZkInterruptedException#construct() honors InterruptedException by 
calling
@@ -1537,7 +1565,9 @@ public class ZkClient implements Watcher {
            * zk-connection
            */
           Thread.interrupted();
-          _connection.close();
+          if (isManagingZkConnection()) {
+            connection.close();
+          }
           /**
            * restore interrupted status of current thread
            */
@@ -1556,26 +1586,20 @@ public class ZkClient implements Watcher {
   }
 
   public boolean isClosed() {
-    return _closed;
-  }
-
-  public boolean isConnectionClosed() {
-    return (_connection == null || _connection.getZookeeperState() == null ||
-        !_connection.getZookeeperState().isAlive());
-  }
-
-  private void reconnect() {
-    getEventLock().lock();
     try {
-      _connection.close();
-      _connection.connect(this);
-    } catch (InterruptedException e) {
-      throw new ZkInterruptedException(e);
+      getEventLock().lock();
+      return _closed;
     } finally {
       getEventLock().unlock();
     }
   }
 
+  public boolean isConnectionClosed() {
+    IZkConnection connection = getConnection();
+    return (connection == null || connection.getZookeeperState() == null ||
+        !connection.getZookeeperState().isAlive());
+  }
+
   public void setShutdownTrigger(boolean triggerState) {
     _shutdownTriggered = triggerState;
   }
@@ -1605,11 +1629,29 @@ public class ZkClient implements Watcher {
     return retryUntilConnected(new Callable<List<OpResult>>() {
 
       @Override public List<OpResult> call() throws Exception {
-        return _connection.multi(ops);
+        return getConnection().multi(ops);
       }
     });
   }
 
+  /**
+   * @return true if this ZkClient is managing the ZkConnection.
+   */
+  protected boolean isManagingZkConnection() {
+    return true;
+  }
+
+  public long getSessionId() {
+    ZkConnection zkConnection = ((ZkConnection) getConnection());
+    ZooKeeper zk = zkConnection.getZookeeper();
+    if (zk == null) {
+      throw new HelixException(
+          "ZooKeeper connection information is not available now. ZkClient 
might be disconnected.");
+    } else {
+      return zkConnection.getZookeeper().getSessionId();
+    }
+  }
+
   // operations to update monitor's counters
   private void record(String path, byte[] data, long startTimeMilliSec, 
ZkClientMonitor.AccessType accessType) {
     if (_monitor != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index d458c52..794e9e1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -31,7 +31,8 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.slf4j.Logger;
@@ -129,14 +130,15 @@ public class HelixCustomCodeRunner {
 
     StateMachineEngine stateMach = _manager.getStateMachineEngine();
     stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, 
_resourceName);
-    ZkClient zkClient = null;
+    HelixZkClient zkClient = null;
     try {
       // manually add ideal state for participant leader using LeaderStandby
       // model
+      HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+      clientConfig.setZkSerializer(new ZNRecordSerializer());
+      zkClient = SharedZkClientFactory
+          .getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
 
-      zkClient =
-          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       HelixDataAccessor accessor =
           new ZKHelixDataAccessor(_manager.getClusterName(), new 
ZkBaseDataAccessor<ZNRecord>(
               zkClient));
@@ -161,9 +163,7 @@ public class HelixCustomCodeRunner {
       LOG.info("Set idealState for participantLeader:" + _resourceName + ", 
idealState:"
           + idealState);
     } finally {
-      if (zkClient != null && zkClient.getConnection() != null)
-
-      {
+      if (zkClient != null && !zkClient.isClosed()) {
         zkClient.close();
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java 
b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
index 80a7820..b1d6582 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -24,15 +24,17 @@ import java.util.UUID;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.Message;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 
 public class MessagePoster {
   public void post(String zkServer, Message message, String clusterName, 
String instanceName) {
-    ZkClient client = new ZkClient(zkServer);
+    HelixZkClient client = 
SharedZkClientFactory.getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(
+        zkServer));
     client.setZkSerializer(new ZNRecordSerializer());
     String path = PropertyPathBuilder.instanceMessage(clusterName, 
instanceName, message.getId());
     client.delete(path);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java 
b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index dd6f3a9..908bba5 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -35,13 +35,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.PropertyJsonComparator;
 import org.apache.helix.store.PropertyJsonSerializer;
 import org.apache.helix.store.PropertyStoreException;
 import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * a test is structured logically as a list of commands a command has three 
parts: COMMAND
@@ -747,10 +749,11 @@ public class TestExecutor {
       String zkAddr, CountDownLatch countDown) {
 
     final Map<TestCommand, Boolean> testResults = new 
ConcurrentHashMap<TestCommand, Boolean>();
-    ZkClient zkClient = null;
 
-    zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = SharedZkClientFactory
+        .getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
 
     // sort on trigger's start time, stable sort
     Collections.sort(commandList, new Comparator<TestCommand>() {
@@ -765,7 +768,7 @@ public class TestExecutor {
 
       TestTrigger trigger = command._trigger;
       command._startTimestamp = System.currentTimeMillis() + 
trigger._startTime;
-      new Thread(new ExecuteCommand(command._startTimestamp, command, 
countDown, zkClient,
+      new Thread(new ExecuteCommand(command._startTimestamp, command, 
countDown, (ZkClient) zkClient,
           testResults)).start();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java 
b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
index c171b73..cf7f22e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
@@ -38,14 +38,15 @@ import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.helix.manager.zk.ByteArraySerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 
 /**
  * Dumps the Zookeeper file structure on to Disk
  */
 @SuppressWarnings("static-access")
 public class ZKDumper {
-  private ZkClient client;
+  private HelixZkClient client;
   private FilenameFilter filter;
   static Options options;
   private String suffix = "";
@@ -110,7 +111,8 @@ public class ZKDumper {
   }
 
   public ZKDumper(String zkAddress) {
-    client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    client = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress));
 
     ZkSerializer zkSerializer = new ByteArraySerializer();
     client.setZkSerializer(zkSerializer);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java 
b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
index 5bd955a..805847c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
@@ -37,11 +37,12 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.manager.zk.ByteArraySerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tool for copying a zk/file path to another zk/file path
@@ -99,7 +100,7 @@ public class ZkCopy {
    * @param dstRootPath
    * @param paths
    */
-  private static void copy(ZkClient srcClient, String srcRootPath, ZkClient 
dstClient,
+  private static void copy(HelixZkClient srcClient, String srcRootPath, 
HelixZkClient dstClient,
       String dstRootPath, List<String> paths) {
     BaseDataAccessor<Object> srcAccessor = new 
ZkBaseDataAccessor<Object>(srcClient);
     List<String> readPaths = new ArrayList<String>();
@@ -146,7 +147,8 @@ public class ZkCopy {
     }
   }
 
-  private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient 
dstClient, String dstRootPath) {
+  private static void zkCopy(HelixZkClient srcClient, String srcRootPath, 
HelixZkClient dstClient,
+      String dstRootPath) {
     // Strip off tailing "/"
     if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) {
       srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1);
@@ -218,21 +220,21 @@ public class ZkCopy {
       String srcZkAddr = srcUri.getAuthority();
       String dstZkAddr = dstUri.getAuthority();
 
-      ZkClient srcClient = null;
-      ZkClient dstClient = null;
+      HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+      HelixZkClient srcClient = null;
+      HelixZkClient dstClient = null;
       try {
         if (srcZkAddr.equals(dstZkAddr)) {
-          srcClient =
-              dstClient =
-                  new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                      ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ByteArraySerializer());
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          srcClient = dstClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), 
clientConfig);
         } else {
-          srcClient =
-              new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ByteArraySerializer());
-          dstClient =
-              new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new 
ByteArraySerializer());
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          srcClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), 
clientConfig);
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          dstClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(dstZkAddr), 
clientConfig);
         }
         String srcPath = srcUri.getPath();
         String dstPath = dstUri.getPath();

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index d3f447a..63d87eb 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -427,7 +427,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _zkclient;
+      return (ZkClient) _zkclient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index ffa2cb2..96f4a88 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -89,7 +89,7 @@ public class ClusterControllerManager extends ZKHelixManager 
implements Runnable
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 1cce08d..b186a1a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -83,7 +83,7 @@ public class ClusterDistributedController extends 
ZKHelixManager implements Runn
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 2bd2630..362709a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -128,7 +128,7 @@ public class MockParticipantManager extends ZKHelixManager 
implements Runnable,
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
new file mode 100644
index 0000000..d0cf004
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -0,0 +1,294 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRawZkClient extends ZkUnitTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(TestRawZkClient.class);
+
+  ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.close();
+  }
+
+  @Test()
+  void testGetStat() {
+    String path = "/tmp/getStatTest";
+    _zkClient.deleteRecursively(path);
+
+    Stat stat, newStat;
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNull(stat);
+    _zkClient.createPersistent(path, true);
+
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNotNull(stat);
+
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertEquals(stat, newStat);
+
+    _zkClient.writeData(path, new ZNRecord("Test"));
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertNotSame(stat, newStat);
+  }
+
+  @Test()
+  void testSessionExpire() throws Exception {
+    IZkStateListener listener = new IZkStateListener() {
+
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception {
+        System.out.println("In Old connection New state " + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        System.out.println("In Old connection New session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws 
Exception {
+      }
+    };
+
+    _zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+    ZooKeeper zookeeper = connection.getZookeeper();
+    System.out.println("old sessionId= " + zookeeper.getSessionId());
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        System.out.println("In New connection In process event:" + event);
+      }
+    };
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), 
watcher,
+            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+    Thread.sleep(3000);
+    System.out.println("New sessionId= " + newZookeeper.getSessionId());
+    Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = ((ZkConnection) _zkClient.getConnection());
+    zookeeper = connection.getZookeeper();
+    System.out.println("After session expiry sessionId= " + 
zookeeper.getSessionId());
+  }
+
+  @Test(expectedExceptions = HelixException.class, 
expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
+  void testDataSizeLimit() {
+    ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
+    _zkClient.writeData("/test", data, -1);
+  }
+
+  @Test
+  public void testZkClientMonitor() throws Exception {
+    final String TEST_TAG = "test_monitor";
+    final String TEST_KEY = "test_key";
+    final String TEST_DATA = "testData";
+    final String TEST_ROOT = "/my_cluster/IDEALSTATES";
+    final String TEST_NODE = "/test_zkclient_monitor";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    ZkClient.Builder builder = new ZkClient.Builder();
+    
builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
+        .setMonitorRootPathOnly(false);
+    ZkClient zkClient = builder.build();
+
+    final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, 
TEST_PATH).length;
+
+    if (_zkClient.exists(TEST_PATH)) {
+      _zkClient.delete(TEST_PATH);
+    }
+    if (!_zkClient.exists(TEST_ROOT)) {
+      _zkClient.createPersistent(TEST_ROOT, true);
+    }
+
+    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+
+    ObjectName name = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
+    ObjectName rootname = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, 
ZkClientPathMonitor.MONITOR_PATH,
+            "Root");
+    ObjectName idealStatename = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, 
ZkClientPathMonitor.MONITOR_PATH,
+            "IdealStates");
+    Assert.assertTrue(beanServer.isRegistered(rootname));
+    Assert.assertTrue(beanServer.isRegistered(idealStatename));
+
+    // Test exists
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max"), 0);
+    zkClient.exists(TEST_ROOT);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max") >= 0);
+
+    // Test create
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteTotalLatencyCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter"),
+        0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max"), 0);
+    zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"),
+        TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"),
+        TEST_DATA_SIZE);
+    long origWriteTotalLatencyCounter =
+        (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
+    Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max") >= 0);
+    long origIdealStatesWriteTotalLatencyCounter =
+        (long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter");
+    Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max") >= 0);
+
+    // Test read
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"), 0);
+    long origReadTotalLatencyCounter =
+        (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
+    long origIdealStatesReadTotalLatencyCounter =
+        (long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter");
+    Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max"), 0);
+    zkClient.readData(TEST_PATH, new Stat());
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 2);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter")
+        >= origReadTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter")
+        >= origIdealStatesReadTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max") >= 0);
+    zkClient.getChildren(TEST_PATH);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 3);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    zkClient.getStat(TEST_PATH);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 4);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 3);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 5);
+
+    ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.ExistsCallbackHandler();
+    zkClient.asyncExists(TEST_PATH, callbackHandler);
+    callbackHandler.waitForSuccess();
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 6);
+
+    // Test write
+    zkClient.writeData(TEST_PATH, TEST_DATA);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"),
+        TEST_DATA_SIZE * 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"),
+        TEST_DATA_SIZE * 2);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteTotalLatencyCounter")
+        >= origWriteTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter")
+        >= origIdealStatesWriteTotalLatencyCounter);
+
+    // Test data change count
+    final Lock lock = new ReentrantLock();
+    final Condition callbackFinish = lock.newCondition();
+    zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws 
Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+        lock.lock();
+        try {
+          callbackFinish.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+    });
+    lock.lock();
+    _zkClient.delete(TEST_PATH);
+    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+    Assert.assertEquals((long) beanServer.getAttribute(name, 
"DataChangeEventCounter"), 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
deleted file mode 100644
index a18dd29..0000000
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
+++ /dev/null
@@ -1,294 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
-import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClient extends ZkUnitTestBase {
-  private static Logger LOG = LoggerFactory.getLogger(TestZkClient.class);
-
-  ZkClient _zkClient;
-
-  @BeforeClass
-  public void beforeClass() {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.close();
-  }
-
-  @Test()
-  void testGetStat() {
-    String path = "/tmp/getStatTest";
-    _zkClient.deleteRecursively(path);
-
-    Stat stat, newStat;
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNull(stat);
-    _zkClient.createPersistent(path, true);
-
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNotNull(stat);
-
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertEquals(stat, newStat);
-
-    _zkClient.writeData(path, new ZNRecord("Test"));
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertNotSame(stat, newStat);
-  }
-
-  @Test()
-  void testSessionExpire() throws Exception {
-    IZkStateListener listener = new IZkStateListener() {
-
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        System.out.println("In Old connection New state " + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        System.out.println("In Old connection New session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws 
Exception {
-      }
-    };
-
-    _zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
-    ZooKeeper zookeeper = connection.getZookeeper();
-    System.out.println("old sessionId= " + zookeeper.getSessionId());
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        System.out.println("In New connection In process event:" + event);
-      }
-    };
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), 
watcher,
-            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
-    Thread.sleep(3000);
-    System.out.println("New sessionId= " + newZookeeper.getSessionId());
-    Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = ((ZkConnection) _zkClient.getConnection());
-    zookeeper = connection.getZookeeper();
-    System.out.println("After session expiry sessionId= " + 
zookeeper.getSessionId());
-  }
-
-  @Test(expectedExceptions = HelixException.class, 
expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
-  void testDataSizeLimit() {
-    ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
-    _zkClient.writeData("/test", data, -1);
-  }
-
-  @Test
-  public void testZkClientMonitor() throws Exception {
-    final String TEST_TAG = "test_monitor";
-    final String TEST_KEY = "test_key";
-    final String TEST_DATA = "testData";
-    final String TEST_ROOT = "/my_cluster/IDEALSTATES";
-    final String TEST_NODE = "/test_zkclient_monitor";
-    final String TEST_PATH = TEST_ROOT + TEST_NODE;
-
-    ZkClient.Builder builder = new ZkClient.Builder();
-    
builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
-        .setMonitorRootPathOnly(false);
-    ZkClient zkClient = builder.build();
-
-    final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, 
TEST_PATH).length;
-
-    if (_zkClient.exists(TEST_PATH)) {
-      _zkClient.delete(TEST_PATH);
-    }
-    if (!_zkClient.exists(TEST_ROOT)) {
-      _zkClient.createPersistent(TEST_ROOT, true);
-    }
-
-    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
-
-    ObjectName name = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
-    ObjectName rootname = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, 
ZkClientPathMonitor.MONITOR_PATH,
-            "Root");
-    ObjectName idealStatename = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), 
ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, 
ZkClientPathMonitor.MONITOR_PATH,
-            "IdealStates");
-    Assert.assertTrue(beanServer.isRegistered(rootname));
-    Assert.assertTrue(beanServer.isRegistered(idealStatename));
-
-    // Test exists
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max"), 0);
-    zkClient.exists(TEST_ROOT);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter") >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max") >= 0);
-
-    // Test create
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter"),
-        0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max"), 0);
-    zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"),
-        TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"),
-        TEST_DATA_SIZE);
-    long origWriteTotalLatencyCounter =
-        (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
-    Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteLatencyGauge.Max") >= 0);
-    long origIdealStatesWriteTotalLatencyCounter =
-        (long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter");
-    Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max") >= 0);
-
-    // Test read
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"), 0);
-    long origReadTotalLatencyCounter =
-        (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
-    long origIdealStatesReadTotalLatencyCounter =
-        (long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter");
-    Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max"), 0);
-    zkClient.readData(TEST_PATH, new Stat());
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 2);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter")
-        >= origReadTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadTotalLatencyCounter")
-        >= origIdealStatesReadTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max") >= 0);
-    zkClient.getChildren(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 3);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    zkClient.getStat(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 4);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 3);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 5);
-
-    ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
-        new ZkAsyncCallbacks.ExistsCallbackHandler();
-    zkClient.asyncExists(TEST_PATH, callbackHandler);
-    callbackHandler.waitForSuccess();
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 6);
-
-    // Test write
-    zkClient.writeData(TEST_PATH, TEST_DATA);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"WriteBytesCounter"),
-        TEST_DATA_SIZE * 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"WriteBytesCounter"),
-        TEST_DATA_SIZE * 2);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"WriteTotalLatencyCounter")
-        >= origWriteTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteTotalLatencyCounter")
-        >= origIdealStatesWriteTotalLatencyCounter);
-
-    // Test data change count
-    final Lock lock = new ReentrantLock();
-    final Condition callbackFinish = lock.newCondition();
-    zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
-      @Override
-      public void handleDataChange(String dataPath, Object data) throws 
Exception {
-      }
-
-      @Override
-      public void handleDataDeleted(String dataPath) throws Exception {
-        lock.lock();
-        try {
-          callbackFinish.signal();
-        } finally {
-          lock.unlock();
-        }
-      }
-    });
-    lock.lock();
-    _zkClient.delete(TEST_PATH);
-    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
-    Assert.assertEquals((long) beanServer.getAttribute(name, 
"DataChangeEventCounter"), 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
index 1f72948..691623e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.zookeeper.WatchedEvent;
@@ -94,7 +95,7 @@ public class TestZkReconnect {
       // 1. shutdown zkServer and check if handler trigger callback
       zkServer.shutdown();
       // Simulate a retry in ZkClient that will not succeed
-      injectExpire(controller._zkclient);
+      injectExpire((ZkClient) controller._zkclient);
       Assert.assertFalse(controller._zkclient.waitUntilConnected(5000, 
TimeUnit.MILLISECONDS));
       // While retrying, onDisconnectedFlag = false
       Assert.assertFalse(onDisconnectedFlag.get());
@@ -102,7 +103,7 @@ public class TestZkReconnect {
       // 2. restart zkServer and check if handler will recover connection
       zkServer.start();
       Assert.assertTrue(controller._zkclient
-          .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, 
TimeUnit.MILLISECONDS));
+          .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, 
TimeUnit.MILLISECONDS));
       Assert.assertTrue(controller.isConnected());
 
       // New propertyStore should be in good state

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
new file mode 100644
index 0000000..67e2731
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -0,0 +1,197 @@
+package org.apache.helix.manager.zk.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixZkClient extends ZkUnitTestBase {
+  final String TEST_NODE = "/test_helix_zkclient";
+
+  @Test public void testZkConnectionManager() {
+    final String TEST_ROOT = "/testZkConnectionManager/IDEALSTATES";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    ZkConnectionManager zkConnectionManager =
+        new ZkConnectionManager(new ZkConnection(ZK_ADDR), 
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
+            null);
+    Assert.assertTrue(zkConnectionManager.waitUntilConnected(1, 
TimeUnit.SECONDS));
+
+    // This client can write/read from ZK
+    zkConnectionManager.createPersistent(TEST_PATH, true);
+    zkConnectionManager.writeData(TEST_PATH, "Test");
+    Assert.assertTrue(zkConnectionManager.readData(TEST_PATH) != null);
+    zkConnectionManager.deleteRecursively(TEST_ROOT);
+
+    // This client can be shared, and cannot close when sharing
+    SharedZkClient sharedZkClient =
+        new SharedZkClient(zkConnectionManager, new 
HelixZkClient.ZkClientConfig(), null);
+    try {
+      zkConnectionManager.close();
+      Assert.fail("Dedicated ZkClient cannot be closed while sharing!");
+    } catch (HelixException hex) {
+      // expected
+    }
+
+    // This client can be closed normally when sharing ends
+    sharedZkClient.close();
+    Assert.assertTrue(sharedZkClient.isClosed());
+    Assert.assertFalse(sharedZkClient.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+
+    zkConnectionManager.close();
+    Assert.assertTrue(zkConnectionManager.isClosed());
+    Assert.assertFalse(zkConnectionManager.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+
+    // Sharing a closed dedicated ZkClient shall fail
+    try {
+      new SharedZkClient(zkConnectionManager, new 
HelixZkClient.ZkClientConfig(), null);
+      Assert.fail("Sharing a closed dedicated ZkClient shall fail.");
+    } catch (HelixException hex) {
+      // expected
+    }
+  }
+
+  @Test(dependsOnMethods = "testZkConnectionManager") public void 
testSharingZkClient()
+      throws Exception {
+    final String TEST_ROOT = "/testSharedZkClient/IDEALSTATES";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    // A factory just for this tests, this for avoiding the impact from other 
tests running in parallel.
+    final SharedZkClientFactory testFactory = new SharedZkClientFactory();
+
+    HelixZkClient.ZkConnectionConfig connectionConfig =
+        new HelixZkClient.ZkConnectionConfig(ZK_ADDR);
+    HelixZkClient sharedZkClientA =
+        testFactory.buildZkClient(connectionConfig, new 
HelixZkClient.ZkClientConfig());
+    Assert.assertTrue(sharedZkClientA.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    HelixZkClient sharedZkClientB =
+        testFactory.buildZkClient(connectionConfig, new 
HelixZkClient.ZkClientConfig());
+    Assert.assertTrue(sharedZkClientB.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    // client A and B is sharing the same session.
+    Assert.assertEquals(sharedZkClientA.getSessionId(), 
sharedZkClientB.getSessionId());
+    long sessionId = sharedZkClientA.getSessionId();
+
+    final int[] notificationCountA = { 0, 0 };
+    sharedZkClientA.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override public void handleDataChange(String s, Object o) {
+        notificationCountA[0]++;
+      }
+
+      @Override public void handleDataDeleted(String s) {
+        notificationCountA[1]++;
+      }
+    });
+    final int[] notificationCountB = { 0, 0 };
+    sharedZkClientB.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override public void handleDataChange(String s, Object o) {
+        notificationCountB[0]++;
+      }
+
+      @Override public void handleDataDeleted(String s) {
+        notificationCountB[1]++;
+      }
+    });
+
+    // Modify using client A and client B will get notification.
+    sharedZkClientA.createPersistent(TEST_PATH, true);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[0] == 1;
+      }
+    }, 1000));
+    Assert.assertEquals(notificationCountB[1], 0);
+
+    sharedZkClientA.deleteRecursively(TEST_ROOT);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[1] == 1;
+      }
+    }, 1000));
+    Assert.assertEquals(notificationCountB[0], 1);
+
+    try {
+      sharedZkClientA.createEphemeral(TEST_PATH, true);
+      Assert.fail("Create Ephemeral nodes using shared client should fail.");
+    } catch (HelixException he) {
+      // expected.
+    }
+
+    sharedZkClientA.close();
+    // Shared client A closed.
+    Assert.assertTrue(sharedZkClientA.isClosed());
+    Assert.assertFalse(sharedZkClientA.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+    // Shared client B still open.
+    Assert.assertFalse(sharedZkClientB.isClosed());
+    Assert.assertTrue(sharedZkClientB.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+
+    // client A cannot do any modify once closed.
+    try {
+      sharedZkClientA.createPersistent(TEST_PATH, true);
+      Assert.fail("Should not be able to create node with a closed client.");
+    } catch (Exception e) {
+      // expected to be here.
+    }
+
+    // Now modify using client B, and client A won't get notification.
+    sharedZkClientB.createPersistent(TEST_PATH, true);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[0] == 2;
+      }
+    }, 1000));
+    Assert.assertFalse(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountA[0] == 2;
+      }
+    }, 1000));
+    sharedZkClientB.deleteRecursively(TEST_ROOT);
+
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    sharedZkClientB.close();
+    // Shared client B closed.
+    Assert.assertTrue(sharedZkClientB.isClosed());
+    Assert.assertFalse(sharedZkClientB.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+
+    // Try to create new shared ZkClient, will get a different session
+    HelixZkClient sharedZkClientC =
+        testFactory.buildZkClient(connectionConfig, new 
HelixZkClient.ZkClientConfig());
+    Assert.assertFalse(sessionId == sharedZkClientC.getSessionId());
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    sharedZkClientC.close();
+    // Shared client C closed.
+    Assert.assertTrue(sharedZkClientC.isClosed());
+    Assert.assertFalse(sharedZkClientC.waitUntilConnected(100, 
TimeUnit.MILLISECONDS));
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+  }
+}

Reply via email to