More refactors to deduplicate code between helix's ZkClient and raw ZkClient.

1) Merge all duplicated (and extended) code in helix.manager.zk.ZkClient into 
helix.manager.zk.zookeeper.ZkClient.
2) Keep helix.manager.zk.ZkClient as a simple wrapper with all constructors and 
builder.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/310d4766
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/310d4766
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/310d4766

Branch: refs/heads/master
Commit: 310d47660b2c31b569ddfb98058ce5877441095f
Parents: 5ffab62
Author: Lei Xia <[email protected]>
Authored: Tue Dec 19 10:22:37 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:32:57 2018 -0800

----------------------------------------------------------------------
 .../helix/manager/zk/ZkAsyncCallbacks.java      |   6 +-
 .../org/apache/helix/manager/zk/ZkClient.java   | 518 ++----------------
 .../helix/manager/zk/zookeeper/ZkClient.java    | 520 ++++++++++++++-----
 .../monitoring/mbeans/ZkClientMonitor.java      |  40 +-
 .../monitoring/mbeans/TestZkClientMonitor.java  |  10 +-
 5 files changed, 454 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
index 3fd0d0f..bfdf7bb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java
@@ -136,9 +136,11 @@ public class ZkAsyncCallbacks {
         ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx;
         if (zkCtx._monitor != null) {
           if (zkCtx._isRead) {
-            zkCtx._monitor.recordRead(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
+            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+                ZkClientMonitor.AccessType.READ);
           } else {
-            zkCtx._monitor.recordWrite(path, zkCtx._bytes, 
zkCtx._startTimeMilliSec);
+            zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec,
+                ZkClientMonitor.AccessType.WRITE);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 182c77e..618a003 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -21,47 +21,51 @@ package org.apache.helix.manager.zk;
 
 import org.I0Itec.zkclient.IZkConnection;
 import org.I0Itec.zkclient.ZkConnection;
-import org.I0Itec.zkclient.exception.ZkException;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.*;
-import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.JMException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
 
 /**
- * ZKClient does not provide some functionalities, this will be used for quick 
fixes if
- * any bug found in ZKClient or if we need additional features but can't wait 
for the new
- * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
+ * This is a wrapper of {@link org.apache.helix.manager.zk.zookeeper.ZkClient},
+ * with additional constructors and builder.
+ *
+ * // TODO: we will need to merge two ZkClient into just one class.
  */
-
 public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
 
   public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
   public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
-  private PathBasedZkSerializer _zkSerializer;
-  private ZkClientMonitor _monitor;
-
-  private ZkClient(IZkConnection connection, int connectionTimeout, long 
operationRetryTimeout,
+  /**
+   *
+   * @param zkConnection
+   *            The Zookeeper connection
+   * @param connectionTimeout
+   *            The connection timeout in milli seconds
+   * @param zkSerializer
+   *            The Zookeeper data serializer
+   * @param operationRetryTimeout
+   *            Most operations are retried in cases like connection loss with 
the Zookeeper servers. During such failures, this
+   *            <code>operationRetryTimeout</code> decides the maximum amount 
of time, in milli seconds, each
+   *            operation is retried. A value lesser than 0 is considered as
+   *            "retry forever until a connection has been reestablished".
+   * @param monitorType
+   * @param monitorKey
+   * @param monitorInstanceName
+   *            These 3 inputs are used to name JMX monitor bean name for this 
ZkClient.
+   *            The JMX bean name will be: 
HelixZkClient.monitorType.monitorKey.monitorInstanceName.
+   * @param monitorRootPathOnly
+   *            Should only stat of access to root path be reported to JMX 
bean or path-specific stat be reported too.
+   */
+  public ZkClient(IZkConnection zkConnection, int connectionTimeout, long 
operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
-    super(connection, connectionTimeout, new ByteArraySerializer(), 
operationRetryTimeout);
-    init(zkSerializer, monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
+    super(zkConnection, connectionTimeout, operationRetryTimeout, 
zkSerializer, monitorType,
+        monitorKey, monitorInstanceName, monitorRootPathOnly);
   }
 
   public ZkClient(IZkConnection connection, int connectionTimeout,
@@ -128,466 +132,16 @@ public class ZkClient extends 
org.apache.helix.manager.zk.zookeeper.ZkClient {
     this(zkServers, null, null);
   }
 
-  protected void init(PathBasedZkSerializer zkSerializer, String monitorType, 
String monitorKey,
-      String monitorInstanceName, boolean monitorRootPathOnly) {
-    _zkSerializer = zkSerializer;
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls));
-    }
-    try {
-      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null 
&& !monitorType
-          .isEmpty()) {
-        _monitor =
-            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
-      } else {
-        LOG.info("ZkClient monitor key or type is not provided. Skip 
monitoring.");
-      }
-    } catch (JMException e) {
-      LOG.error("Error in creating ZkClientMonitor", e);
-    }
-  }
-
-  @Override
-  public void setZkSerializer(ZkSerializer zkSerializer) {
-    _zkSerializer = new BasicZkSerializer(zkSerializer);
-  }
-
-  public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
-    _zkSerializer = zkSerializer;
-  }
-
-  public PathBasedZkSerializer getZkSerializer() {
-    return _zkSerializer;
-  }
-
-  public IZkConnection getConnection() {
-    return _connection;
-  }
-
-  @Override
-  public void close() throws ZkInterruptedException {
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
-    }
-    getEventLock().lock();
-    try {
-      if (_connection == null) {
-        return;
-      }
-      LOG.info("Closing zkclient: " + ((ZkConnection) 
_connection).getZookeeper());
-      super.close();
-    } catch (ZkInterruptedException e) {
-      /**
-       * Workaround for HELIX-264: calling ZkClient#close() in its own 
eventThread context will
-       * throw ZkInterruptedException and skip ZkConnection#close()
-       */
-      if (_connection != null) {
-        try {
-          /**
-           * ZkInterruptedException#construct() honors InterruptedException by 
calling
-           * Thread.currentThread().interrupt(); clear it first, so we can 
safely close the
-           * zk-connection
-           */
-          Thread.interrupted();
-          _connection.close();
-          /**
-           * restore interrupted status of current thread
-           */
-          Thread.currentThread().interrupt();
-        } catch (InterruptedException e1) {
-          throw new ZkInterruptedException(e1);
-        }
-      }
-    } finally {
-      getEventLock().unlock();
-      if (_monitor != null) {
-        _monitor.unregister();
-      }
-      LOG.info("Closed zkclient");
-    }
-  }
-
-  public boolean isClosed() {
-    return (_connection == null || !_connection.getZookeeperState().isAlive());
-  }
-
-  public Stat getStat(final String path) {
-    long startT = System.currentTimeMillis();
-    try {
-      Stat stat = retryUntilConnected(new Callable<Stat>() {
-
-        @Override
-        public Stat call() throws Exception {
-          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, 
false);
-          return stat;
-        }
-      });
-      recordRead(path, null, startT);
-      return stat;
-    } catch (Exception e) {
-      recordReadFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  // override exists(path, watch), so we can record all exists requests
-  @Override
-  protected boolean exists(final String path, final boolean watch) {
-    long startT = System.currentTimeMillis();
-    try {
-      boolean exists = retryUntilConnected(new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws Exception {
-          return _connection.exists(path, watch);
-        }
-      });
-      recordRead(path, null, startT);
-      return exists;
-    } catch (Exception e) {
-      recordReadFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  // override getChildren(path, watch), so we can record all getChildren 
requests
-  @Override
-  protected List<String> getChildren(final String path, final boolean watch) {
-    long startT = System.currentTimeMillis();
-    try {
-      List<String> children = retryUntilConnected(new Callable<List<String>>() 
{
-        @Override
-        public List<String> call() throws Exception {
-          return _connection.getChildren(path, watch);
-        }
-      });
-      recordRead(path, null, startT);
-      return children;
-    } catch (Exception e) {
-      recordReadFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("getChildren, path: " + path + ", time: " + (endT - startT) 
+ " ms");
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends Object> T deserialize(byte[] data, String path) {
-    if (data == null) {
-      return null;
-    }
-    return (T) _zkSerializer.deserialize(data, path);
-  }
-
-  // override readData(path, stat, watch), so we can record all read requests
-  @Override
-  @SuppressWarnings("unchecked")
-  protected <T extends Object> T readData(final String path, final Stat stat, 
final boolean watch) {
-    long startT = System.currentTimeMillis();
-    byte[] data = null;
-    try {
-      data = retryUntilConnected(new Callable<byte[]>() {
-
-        @Override
-        public byte[] call() throws Exception {
-          return _connection.readData(path, stat, watch);
-        }
-      });
-      recordRead(path, data, startT);
-      return (T) deserialize(data, path);
-    } catch (Exception e) {
-      recordReadFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("getData, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T extends Object> T readDataAndStat(String path, Stat stat,
-      boolean returnNullIfPathNotExists) {
-    T data = null;
-    try {
-      data = readData(path, stat);
-    } catch (ZkNoNodeException e) {
-      if (!returnNullIfPathNotExists) {
-        throw e;
-      }
-    }
-    return data;
-  }
-
-  public String getServers() {
-    return _connection.getServers();
-  }
-
-  public byte[] serialize(Object data, String path) {
-    return _zkSerializer.serialize(data, path);
-  }
-
-  @Override
-  public void writeData(final String path, Object datat, final int 
expectedVersion) {
-    long startT = System.currentTimeMillis();
-    try {
-      final byte[] data = serialize(datat, path);
-      checkDataSizeLimit(data);
-      retryUntilConnected(new Callable<Object>() {
-
-        @Override public Object call() throws Exception {
-          _connection.writeData(path, data, expectedVersion);
-          return null;
-        }
-      });
-      recordWrite(path, data, startT);
-    } catch (Exception e) {
-      recordWriteFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  public Stat writeDataGetStat(final String path, Object datat, final int 
expectedVersion)
-      throws InterruptedException {
-    long startT = System.currentTimeMillis();
-    try {
-      final byte[] data = _zkSerializer.serialize(datat, path);
-      checkDataSizeLimit(data);
-      Stat stat = retryUntilConnected(new Callable<Stat>() {
-
-        @Override public Stat call() throws Exception {
-          return ((ZkConnection) _connection).getZookeeper()
-              .setData(path, data, expectedVersion);
-        }
-      });
-      recordWrite(path, data, startT);
-      return stat;
-    } catch (Exception e) {
-      recordWriteFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  @Override
-  public String create(final String path, Object datat, final CreateMode mode)
-      throws IllegalArgumentException, ZkException {
-    if (path == null) {
-      throw new NullPointerException("path must not be null.");
-    }
-    long startT = System.currentTimeMillis();
-    try {
-      final byte[] data = datat == null ? null : serialize(datat, path);
-      checkDataSizeLimit(data);
-      String actualPath = retryUntilConnected(new Callable<String>() {
-        @Override
-        public String call() throws Exception {
-          return _connection.create(path, data, mode);
-        }
-      });
-      recordWrite(path, data, startT);
-      return actualPath;
-    } catch (Exception e) {
-      recordWriteFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("create, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-  }
-
-  @Override
-  public boolean delete(final String path) {
-    long startT = System.currentTimeMillis();
-    boolean isDeleted;
-    try {
-      try {
-        retryUntilConnected(new Callable<Object>() {
-
-          @Override
-          public Object call() throws Exception {
-            _connection.delete(path);
-            return null;
-          }
-        });
-        isDeleted = true;
-      } catch (ZkNoNodeException e) {
-        isDeleted = false;
-        LOG.error("Failed to delete path " + path + ", znode does not exist!");
-      }
-      recordWrite(path, null, startT);
-    } catch (Exception e) {
-      recordWriteFailure(path);
-      throw e;
-    } finally {
-      long endT = System.currentTimeMillis();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("delete, path: " + path + ", time: " + (endT - startT) + " 
ms");
-      }
-    }
-    return isDeleted;
-  }
-
-  public void asyncCreate(final String path, Object datat, final CreateMode 
mode,
-      final CreateCallbackHandler cb) {
-    final long startT = System.currentTimeMillis();
-    final byte[] data = (datat == null ? null : serialize(datat, path));
-    retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().create(path, data, 
Ids.OPEN_ACL_UNSAFE,
-            // Arrays.asList(DEFAULT_ACL),
-            mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
-    });
-  }
-
-  public void asyncSetData(final String path, Object datat, final int version,
-      final SetDataCallbackHandler cb) {
-    final long startT = System.currentTimeMillis();
-    final byte[] data = serialize(datat, path);
-    retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().setData(path, data, 
version, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
-    });
-  }
-
-  public void asyncGetData(final String path, final GetDataCallbackHandler cb) 
{
-    final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().getData(path, null, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
-        return null;
-      }
-    });
-  }
-
-  public void asyncExists(final String path, final ExistsCallbackHandler cb) {
-    final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().exists(path, null, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
-        return null;
-      }
-    });
-  }
-
-  public void asyncDelete(final String path, final DeleteCallbackHandler cb) {
-    final long startT = System.currentTimeMillis();
-    retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
false));
-        return null;
-      }
-    });
-  }
-
-  public <T> T retryUntilConnected(final Callable<T> callable) {
-    final ZkConnection zkConnection = (ZkConnection) getConnection();
-    return super.retryUntilConnected(new Callable<T>() {
-      @Override
-      public T call() throws Exception {
-        // Validate that the connection is not null before trigger callback
-        if (zkConnection == null || zkConnection.getZookeeper() == null) {
-          throw new IllegalStateException(
-              "ZkConnection is in invalid state! Please close this ZkClient 
and create new client.");
-        }
-        return callable.call();
-      }
-    });
-  }
-
-  private void checkDataSizeLimit(byte[] data) {
-    if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
-      LOG.error("Data size larger than 1M, will not write to zk. Data (first 
1k): "
-          + new String(data).substring(0, 1024));
-      throw new HelixException("Data size larger than 1M");
-    }
-  }
-
-  @Override public void process(WatchedEvent event) {
-    boolean stateChanged = event.getPath() == null;
-    boolean dataChanged = event.getType() == Event.EventType.NodeDataChanged
-        || event.getType() == Event.EventType.NodeDeleted
-        || event.getType() == Event.EventType.NodeCreated
-        || event.getType() == Event.EventType.NodeChildrenChanged;
-
-    if (_monitor != null) {
-      if (stateChanged) {
-        _monitor.increaseStateChangeEventCounter();
-      }
-      if (dataChanged) {
-        _monitor.increaseDataChangeEventCounter();
-      }
-    }
-
-    super.process(event);
-  }
-
-  private void recordRead(String path, byte[] data, long startTimeMilliSec) {
-    if (_monitor != null) {
-      int dataSize = 0;
-      if (data != null) {
-        dataSize = data.length;
-      }
-      _monitor.recordRead(path, dataSize, startTimeMilliSec);
-    }
+  public ZkClient(final String zkServers, final int sessionTimeout, final int 
connectionTimeout,
+      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, 
zkSerializer,
+        operationRetryTimeout);
   }
 
-  private void recordWrite(String path, byte[] data, long startTimeMilliSec) {
-    if (_monitor != null) {
-      int dataSize = 0;
-      if (data != null) {
-        dataSize = data.length;
-      }
-      _monitor.recordWrite(path, dataSize, startTimeMilliSec);
-    }
-  }
-
-  private void recordReadFailure(String path) {
-    if (_monitor != null) {
-      _monitor.recordReadFailure(path);
-    }
-  }
-
-  private void recordWriteFailure(String path) {
-    if (_monitor != null) {
-      _monitor.recordWriteFailure(path);
-    }
+  public ZkClient(final IZkConnection zkConnection, final int 
connectionTimeout,
+      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
+    this(zkConnection, connectionTimeout, operationRetryTimeout,
+        new BasicZkSerializer(zkSerializer), null, null, null, false);
   }
 
   public static class Builder {

http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index d26a274..4748d6e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -10,6 +10,7 @@
  */
 package org.apache.helix.manager.zk.zookeeper;
 
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -19,8 +20,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
-
+import javax.management.JMException;
 import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.ExceptionUtil;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkConnection;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -33,9 +35,15 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.I0Itec.zkclient.ExceptionUtil;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.BasicZkSerializer;
+import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
+import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -49,12 +57,12 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstracts the interaction with zookeeper and allows permanent (not just one 
time) watches on nodes in ZooKeeper
+ * Abstracts the interaction with zookeeper and allows permanent (not just one 
time) watches on nodes in ZooKeeper.
+ * WARN: Do not use this class directly, use {@link 
org.apache.helix.manager.zk.ZkClient} instead.
  */
 public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
@@ -62,106 +70,51 @@ public class ZkClient implements Watcher {
   protected final IZkConnection _connection;
   protected final long operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener =
-      new ConcurrentHashMap<String, Set<IZkChildListener>>();
+      new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener =
-      new ConcurrentHashMap<String, Set<IZkDataListener>>();
-  private final Set<IZkStateListener> _stateListener = new 
CopyOnWriteArraySet<IZkStateListener>();
+      new ConcurrentHashMap<>();
+  private final Set<IZkStateListener> _stateListener = new 
CopyOnWriteArraySet<>();
   private KeeperState _currentState;
   private final ZkLock _zkEventLock = new ZkLock();
   private boolean _shutdownTriggered;
   private ZkEventThread _eventThread;
   // TODO PVo remove this later
   private Thread _zookeeperEventThread;
-  private ZkSerializer _zkSerializer;
   private volatile boolean _closed;
+  private PathBasedZkSerializer _pathBasedZkSerializer;
+  private ZkClientMonitor _monitor;
 
-  public ZkClient(String serverstring) {
-    this(serverstring, Integer.MAX_VALUE);
-  }
-
-  public ZkClient(String zkServers, int connectionTimeout) {
-    this(new ZkConnection(zkServers), connectionTimeout);
-  }
-
-  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) 
{
-    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
-  }
-
-  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-      ZkSerializer zkSerializer) {
-    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, 
zkSerializer);
-  }
-
-  /**
-   *
-   * @param zkServers
-   *            The Zookeeper servers
-   * @param sessionTimeout
-   *            The session timeout in milli seconds
-   * @param connectionTimeout
-   *            The connection timeout in milli seconds
-   * @param zkSerializer
-   *            The Zookeeper data serializer
-   * @param operationRetryTimeout
-   *            Most operations done through this {@link 
org.I0Itec.zkclient.ZkClient} are retried in cases like
-   *            connection loss with the Zookeeper servers. During such 
failures, this
-   *            <code>operationRetryTimeout</code> decides the maximum amount 
of time, in milli seconds, each
-   *            operation is retried. A value lesser than 0 is considered as
-   *            "retry forever until a connection has been reestablished".
-   */
-  public ZkClient(final String zkServers, final int sessionTimeout, final int 
connectionTimeout,
-      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
-    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, 
zkSerializer,
-        operationRetryTimeout);
-  }
-
-  public ZkClient(IZkConnection connection) {
-    this(connection, Integer.MAX_VALUE);
-  }
-
-  public ZkClient(IZkConnection connection, int connectionTimeout) {
-    this(connection, connectionTimeout, new SerializableSerializer());
-  }
 
-  public ZkClient(IZkConnection zkConnection, int connectionTimeout, 
ZkSerializer zkSerializer) {
-    this(zkConnection, connectionTimeout, zkSerializer, -1);
-  }
-
-  /**
-   *
-   * @param zkConnection
-   *            The Zookeeper servers
-   * @param connectionTimeout
-   *            The connection timeout in milli seconds
-   * @param zkSerializer
-   *            The Zookeeper data serializer
-   * @param operationRetryTimeout
-   *            Most operations done through this {@link 
org.I0Itec.zkclient.ZkClient} are retried in cases like
-   *            connection loss with the Zookeeper servers. During such 
failures, this
-   *            <code>operationRetryTimeout</code> decides the maximum amount 
of time, in milli seconds, each
-   *            operation is retried. A value lesser than 0 is considered as
-   *            "retry forever until a connection has been reestablished".
-   */
-  public ZkClient(final IZkConnection zkConnection, final int 
connectionTimeout,
-      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
+  protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long 
operationRetryTimeout,
+      PathBasedZkSerializer zkSerializer, String monitorType, String 
monitorKey,
+      String monitorInstanceName, boolean monitorRootPathOnly) {
     if (zkConnection == null) {
       throw new NullPointerException("Zookeeper connection is null!");
     }
     _connection = zkConnection;
-    _zkSerializer = zkSerializer;
+    _pathBasedZkSerializer = zkSerializer;
     this.operationRetryTimeoutInMillis = operationRetryTimeout;
     connect(connectionTimeout, this);
-  }
 
-  public void setZkSerializer(ZkSerializer zkSerializer) {
-    _zkSerializer = zkSerializer;
+    // initiate monitor
+    try {
+      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null 
&& !monitorType
+          .isEmpty()) {
+        _monitor =
+            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
+      } else {
+        LOG.info("ZkClient monitor key or type is not provided. Skip 
monitoring.");
+      }
+    } catch (JMException e) {
+      LOG.error("Error in creating ZkClientMonitor", e);
+    }
   }
 
   public List<String> subscribeChildChanges(String path, IZkChildListener 
listener) {
     synchronized (_childListener) {
       Set<IZkChildListener> listeners = _childListener.get(path);
       if (listeners == null) {
-        listeners = new CopyOnWriteArraySet<IZkChildListener>();
+        listeners = new CopyOnWriteArraySet<>();
         _childListener.put(path, listeners);
       }
       listeners.add(listener);
@@ -183,7 +136,7 @@ public class ZkClient implements Watcher {
     synchronized (_dataListener) {
       listeners = _dataListener.get(path);
       if (listeners == null) {
-        listeners = new CopyOnWriteArraySet<IZkDataListener>();
+        listeners = new CopyOnWriteArraySet<>();
         _dataListener.put(path, listeners);
       }
       listeners.add(listener);
@@ -446,7 +399,7 @@ public class ZkClient implements Watcher {
    * Create a node with ACL.
    *
    * @param path
-   * @param data
+   * @param datat
    * @param acl
    * @param mode
    * @return create node's path
@@ -459,21 +412,35 @@ public class ZkClient implements Watcher {
    * @throws RuntimeException
    *             if any other exception occurs
    */
-  public String create(final String path, Object data, final List<ACL> acl, 
final CreateMode mode) {
+  public String create(final String path, Object datat, final List<ACL> acl, 
final CreateMode mode)
+      throws IllegalArgumentException, ZkException {
     if (path == null) {
-      throw new NullPointerException("Missing value for path");
+      throw new NullPointerException("Path must not be null.");
     }
     if (acl == null || acl.size() == 0) {
       throw new NullPointerException("Missing value for ACL");
     }
-    final byte[] bytes = data == null ? null : serialize(data);
-
-    return retryUntilConnected(new Callable<String>() {
-      @Override public String call() throws Exception {
-        return _connection.create(path, bytes, acl, mode);
+    long startT = System.currentTimeMillis();
+    try {
+      final byte[] data = datat == null ? null : serialize(datat, path);
+      checkDataSizeLimit(data);
+      String actualPath = retryUntilConnected(new Callable<String>() {
+        @Override
+        public String call() throws Exception {
+          return _connection.create(path, data, acl, mode);
+        }
+      });
+      record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
+      return actualPath;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.WRITE);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("create, path: " + path + ", time: " + (endT - startT) + " 
ms");
       }
-    });
-
+    }
   }
 
   /**
@@ -556,16 +523,17 @@ public class ZkClient implements Watcher {
     return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 
-  @Override public void process(WatchedEvent event) {
+  @Override
+  public void process(WatchedEvent event) {
     LOG.debug("Received event: " + event);
     _zookeeperEventThread = Thread.currentThread();
 
     boolean stateChanged = event.getPath() == null;
     boolean znodeChanged = event.getPath() != null;
-    boolean dataChanged =
-        event.getType() == EventType.NodeDataChanged || event.getType() == 
EventType.NodeDeleted
-            || event.getType() == EventType.NodeCreated
-            || event.getType() == EventType.NodeChildrenChanged;
+    boolean dataChanged = event.getType() == Event.EventType.NodeDataChanged
+        || event.getType() == Event.EventType.NodeDeleted
+        || event.getType() == Event.EventType.NodeCreated
+        || event.getType() == Event.EventType.NodeChildrenChanged;
 
     getEventLock().lock();
     try {
@@ -604,6 +572,10 @@ public class ZkClient implements Watcher {
         getEventLock().getDataChangedCondition().signalAll();
       }
       getEventLock().unlock();
+
+      // update state change counter.
+      recordStateChange(stateChanged, dataChanged);
+
       LOG.debug("Leaving process event");
     }
   }
@@ -622,13 +594,28 @@ public class ZkClient implements Watcher {
   }
 
   protected List<String> getChildren(final String path, final boolean watch) {
-    return retryUntilConnected(new Callable<List<String>>() {
-      @Override public List<String> call() throws Exception {
-        return _connection.getChildren(path, watch);
+    long startT = System.currentTimeMillis();
+    try {
+      List<String> children = retryUntilConnected(new Callable<List<String>>() 
{
+        @Override
+        public List<String> call() throws Exception {
+          return _connection.getChildren(path, watch);
+        }
+      });
+      record(path, null, startT, ZkClientMonitor.AccessType.READ);
+      return children;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.READ);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getChildren, path: " + path + ", time: " + (endT - startT) 
+ " ms");
       }
-    });
+    }
   }
 
+
   /**
    * Counts number of children for the given path.
    *
@@ -643,16 +630,54 @@ public class ZkClient implements Watcher {
     }
   }
 
+  public boolean exists(final String path) {
+    return exists(path, hasListeners(path));
+  }
+
+
   protected boolean exists(final String path, final boolean watch) {
-    return retryUntilConnected(new Callable<Boolean>() {
-      @Override public Boolean call() throws Exception {
-        return _connection.exists(path, watch);
+    long startT = System.currentTimeMillis();
+    try {
+      boolean exists = retryUntilConnected(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return _connection.exists(path, watch);
+        }
+      });
+      record(path, null, startT, ZkClientMonitor.AccessType.READ);
+      return exists;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.READ);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " 
ms");
       }
-    });
+    }
   }
 
-  public boolean exists(final String path) {
-    return exists(path, hasListeners(path));
+  public Stat getStat(final String path) {
+    long startT = System.currentTimeMillis();
+    try {
+      Stat stat = retryUntilConnected(new Callable<Stat>() {
+        @Override
+        public Stat call() throws Exception {
+          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, 
false);
+          return stat;
+        }
+      });
+      record(path, null, startT, ZkClientMonitor.AccessType.READ);
+      return stat;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.READ);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " 
ms");
+      }
+    }
   }
 
   private void processStateChanged(WatchedEvent event) {
@@ -826,6 +851,10 @@ public class ZkClient implements Watcher {
     return _dataListener.get(path);
   }
 
+  public IZkConnection getConnection() {
+    return _connection;
+  }
+
   public void waitUntilConnected() throws ZkInterruptedException {
     waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
@@ -882,8 +911,8 @@ public class ZkClient implements Watcher {
    * @throws RuntimeException
    *             if any other exception occurs from invoking the Callable
    */
-  public <T> T retryUntilConnected(Callable<T> callable)
-      throws ZkInterruptedException, IllegalArgumentException, ZkException, 
RuntimeException {
+  public <T> T retryUntilConnected(final Callable<T> callable)
+      throws IllegalArgumentException, ZkException {
     if (_zookeeperEventThread != null && Thread.currentThread() == 
_zookeeperEventThread) {
       throw new IllegalArgumentException("Must not be done in the zookeeper 
event thread.");
     }
@@ -893,6 +922,12 @@ public class ZkClient implements Watcher {
         throw new IllegalStateException("ZkClient already closed!");
       }
       try {
+        final ZkConnection zkConnection = (ZkConnection) getConnection();
+        // Validate that the connection is not null before trigger callback
+        if (zkConnection == null || zkConnection.getZookeeper() == null) {
+          throw new IllegalStateException(
+              "ZkConnection is in invalid state! Please close this ZkClient 
and create new client.");
+        }
         return callable.call();
       } catch (ConnectionLossException e) {
         // we give the event thread some time to update the status to 
'Disconnected'
@@ -948,30 +983,58 @@ public class ZkClient implements Watcher {
   }
 
   public boolean delete(final String path) {
+    long startT = System.currentTimeMillis();
+    boolean success;
     try {
-      retryUntilConnected(new Callable<Object>() {
-
-        @Override public Object call() throws Exception {
-          _connection.delete(path);
-          return null;
-        }
-      });
+      try {
+        retryUntilConnected(new Callable<Object>() {
 
-      return true;
-    } catch (ZkNoNodeException e) {
-      return false;
+          @Override
+          public Object call() throws Exception {
+            _connection.delete(path);
+            return null;
+          }
+        });
+        success = true;
+      } catch (ZkNoNodeException e) {
+        success = false;
+        LOG.warn("Failed to delete path " + path + ", znode does not exist!");
+      }
+      record(path, null, startT, ZkClientMonitor.AccessType.WRITE);
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.WRITE);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("delete, path: " + path + ", time: " + (endT - startT) + " 
ms");
+      }
     }
+    return success;
+  }
+
+  public void setZkSerializer(ZkSerializer zkSerializer) {
+    _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
+  }
+
+  public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
+    _pathBasedZkSerializer = zkSerializer;
   }
 
-  private byte[] serialize(Object data) {
-    return _zkSerializer.serialize(data);
+  public PathBasedZkSerializer getZkSerializer() {
+    return _pathBasedZkSerializer;
   }
 
-  @SuppressWarnings("unchecked") private <T extends Object> T 
derializable(byte[] data) {
+  public byte[] serialize(Object data, String path) {
+    return _pathBasedZkSerializer.serialize(data, path);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T deserialize(byte[] data, String path) {
     if (data == null) {
       return null;
     }
-    return (T) _zkSerializer.deserialize(data);
+    return (T) _pathBasedZkSerializer.deserialize(data, path);
   }
 
   @SuppressWarnings("unchecked") public <T extends Object> T readData(String 
path) {
@@ -991,19 +1054,47 @@ public class ZkClient implements Watcher {
     return data;
   }
 
-  @SuppressWarnings("unchecked") public <T extends Object> T readData(String 
path, Stat stat) {
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readData(String path, Stat stat) {
     return (T) readData(path, stat, hasListeners(path));
   }
 
-  @SuppressWarnings("unchecked") protected <T extends Object> T readData(final 
String path,
-      final Stat stat, final boolean watch) {
-    byte[] data = retryUntilConnected(new Callable<byte[]>() {
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readData(final String path, final Stat stat, 
final boolean watch) {
+    long startT = System.currentTimeMillis();
+    byte[] data = null;
+    try {
+      data = retryUntilConnected(new Callable<byte[]>() {
 
-      @Override public byte[] call() throws Exception {
-        return _connection.readData(path, stat, watch);
+        @Override public byte[] call() throws Exception {
+          return _connection.readData(path, stat, watch);
+        }
+      });
+      record(path, data, startT, ZkClientMonitor.AccessType.READ);
+      return (T) deserialize(data, path);
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.READ);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getData, path: " + path + ", time: " + (endT - startT) + " 
ms");
       }
-    });
-    return (T) derializable(data);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readDataAndStat(String path, Stat stat,
+      boolean returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      data = readData(path, stat);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
+      }
+    }
+    return data;
   }
 
   public void writeData(String path, Object object) {
@@ -1043,16 +1134,104 @@ public class ZkClient implements Watcher {
   }
 
   public Stat writeDataReturnStat(final String path, Object datat, final int 
expectedVersion) {
-    final byte[] data = serialize(datat);
-    return (Stat) retryUntilConnected(new Callable<Object>() {
+    long startT = System.currentTimeMillis();
+    try {
+      final byte[] data = serialize(datat, path);
+      checkDataSizeLimit(data);
+      final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
+        @Override public Object call() throws Exception {
+          return _connection.writeDataReturnStat(path, data, expectedVersion);
+        }
+      });
+      record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
+      return stat;
+    } catch (Exception e) {
+      recordFailure(path, ZkClientMonitor.AccessType.WRITE);
+      throw e;
+    } finally {
+      long endT = System.currentTimeMillis();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " 
ms");
+      }
+    }
+  }
+
+  public Stat writeDataGetStat(final String path, Object datat, final int 
expectedVersion) {
+    return writeDataReturnStat(path, datat, expectedVersion);
+  }
+
 
+  public void asyncCreate(final String path, Object datat, final CreateMode 
mode,
+      final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    final byte[] data = (datat == null ? null : serialize(datat, path));
+    retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        Stat stat = _connection.writeDataReturnStat(path, data, 
expectedVersion);
-        return stat;
+        ((ZkConnection) _connection).getZookeeper().create(path, data, 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+            // Arrays.asList(DEFAULT_ACL),
+            mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                data == null ? 0 : data.length, false));
+        return null;
       }
     });
   }
 
+  // Async Data Accessors
+  public void asyncSetData(final String path, Object datat, final int version,
+      final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    final byte[] data = serialize(datat, path);
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        ((ZkConnection) _connection).getZookeeper().setData(path, data, 
version, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                data == null ? 0 : data.length, false));
+        return null;
+      }
+    });
+  }
+
+  public void asyncGetData(final String path, final 
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        ((ZkConnection) _connection).getZookeeper().getData(path, null, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
+        return null;
+      }
+    });
+  }
+
+  public void asyncExists(final String path, final 
ZkAsyncCallbacks.ExistsCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        ((ZkConnection) _connection).getZookeeper().exists(path, null, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
true));
+        return null;
+      }
+    });
+  }
+
+  public void asyncDelete(final String path, final 
ZkAsyncCallbacks.DeleteCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb,
+            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, 
false));
+        return null;
+      }
+    });
+  }
+
+  private void checkDataSizeLimit(byte[] data) {
+    if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
+      LOG.error("Data size larger than 1M, will not write to zk. Data (first 
1k): "
+          + new String(data).substring(0, 1024));
+      throw new HelixException("Data size larger than 1M");
+    }
+  }
+
   public void watchForData(final String path) {
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
@@ -1153,29 +1332,64 @@ public class ZkClient implements Watcher {
     }
   }
 
+  public String getServers() {
+    return _connection.getServers();
+  }
+
   /**
    * Close the client.
    *
    * @throws ZkInterruptedException
    */
   public void close() throws ZkInterruptedException {
-    if (_closed) {
-      return;
+    if (LOG.isTraceEnabled()) {
+      StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
     }
-    LOG.debug("Closing ZkClient...");
     getEventLock().lock();
     try {
+      if (_connection == null || _closed) {
+        return;
+      }
+      LOG.info("Closing zkclient: " + ((ZkConnection) 
_connection).getZookeeper());
       setShutdownTrigger(true);
       _eventThread.interrupt();
       _eventThread.join(2000);
       _connection.close();
       _closed = true;
     } catch (InterruptedException e) {
-      throw new ZkInterruptedException(e);
+      /**
+       * Workaround for HELIX-264: calling ZkClient#close() in its own 
eventThread context will
+       * throw ZkInterruptedException and skip ZkConnection#close()
+       */
+      if (_connection != null) {
+        try {
+          /**
+           * ZkInterruptedException#construct() honors InterruptedException by 
calling
+           * Thread.currentThread().interrupt(); clear it first, so we can 
safely close the
+           * zk-connection
+           */
+          Thread.interrupted();
+          _connection.close();
+          /**
+           * restore interrupted status of current thread
+           */
+          Thread.currentThread().interrupt();
+        } catch (InterruptedException e1) {
+          throw new ZkInterruptedException(e1);
+        }
+      }
     } finally {
       getEventLock().unlock();
+      if (_monitor != null) {
+        _monitor.unregister();
+      }
+      LOG.info("Closed zkclient");
     }
-    LOG.debug("Closing ZkClient...done");
+  }
+
+  public boolean isClosed() {
+    return (_connection == null || !_connection.getZookeeperState().isAlive());
   }
 
   private void reconnect() {
@@ -1223,4 +1437,30 @@ public class ZkClient implements Watcher {
       }
     });
   }
+
+  // operations to update monitor's counters
+  private void record(String path, byte[] data, long startTimeMilliSec, 
ZkClientMonitor.AccessType accessType) {
+    if (_monitor != null) {
+      int dataSize = (data != null) ? data.length : 0;
+      _monitor.record(path, dataSize, startTimeMilliSec, accessType);
+    }
+  }
+
+  private void recordFailure(String path, ZkClientMonitor.AccessType 
accessType) {
+    if (_monitor != null) {
+      _monitor.recordFailure(path, accessType);
+    }
+  }
+
+  private void recordStateChange(boolean stateChanged, boolean dataChanged) {
+    // update state change counter.
+    if (_monitor != null) {
+      if (stateChanged) {
+        _monitor.increaseStateChangeEventCounter();
+      }
+      if (dataChanged) {
+        _monitor.increaseDataChangeEventCounter();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index 639fd8a..6cdf6e7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -31,6 +31,11 @@ public class ZkClientMonitor implements ZkClientMonitorMBean 
{
   public static final String MONITOR_TYPE = "Type";
   public static final String MONITOR_KEY = "Key";
 
+  public enum AccessType {
+    READ,
+    WRITE
+  }
+
   private ObjectName _objectName;
   private String _sensorName;
 
@@ -117,19 +122,30 @@ public class ZkClientMonitor implements 
ZkClientMonitorMBean {
     }
   }
 
-  public void recordReadFailure(String path) {
-    record(path, 0, 0, true, true);
-  }
-
-  public void recordRead(String path, int dataSize, long startTimeMilliSec) {
-    record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, 
false, true);
-  }
-
-  public void recordWriteFailure(String path) {
-    record(path, 0, 0, true, false);
+  public void record(String path, int dataSize, long startTimeMilliSec, 
AccessType accessType) {
+    switch (accessType) {
+    case READ:
+      record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, 
false, true);
+      return;
+    case WRITE:
+      record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, 
false, false);
+      return;
+
+    default:
+      return;
+    }
   }
 
-  public void recordWrite(String path, int dataSize, long startTimeMilliSec) {
-    record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, 
false, false);
+  public void recordFailure(String path, AccessType accessType) {
+    switch (accessType) {
+    case READ:
+      record(path, 0, 0, true, true);
+      return;
+    case WRITE:
+      record(path, 0, 0, true, false);
+      return;
+    default:
+      return;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 8bf136e..1b8099c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -98,18 +98,20 @@ public class TestZkClientMonitor {
     long eventCount = (long) _beanServer.getAttribute(name, 
"DataChangeEventCounter");
     Assert.assertEquals(eventCount, 1);
 
-    monitor.recordRead("TEST/IDEALSTATES/myResource", 0, 
System.currentTimeMillis() - 10);
+    monitor.record("TEST/IDEALSTATES/myResource", 0, 
System.currentTimeMillis() - 10,
+        ZkClientMonitor.AccessType.READ);
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, 
"ReadCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, 
"ReadCounter"), 1);
     Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadLatencyGauge.Max") >= 10);
-    monitor.recordRead("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() 
- 15);
+    monitor.record("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() - 
15,
+        ZkClientMonitor.AccessType.READ);
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, 
"ReadCounter"), 2);
     Assert.assertEquals((long) _beanServer.getAttribute(instancesName, 
"ReadCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, 
"ReadCounter"), 1);
     Assert.assertTrue((long) _beanServer.getAttribute(rootName, 
"ReadTotalLatencyCounter") >= 25);
 
-    
monitor.recordWrite("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5,
-        System.currentTimeMillis() - 10);
+    monitor.record("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5,
+        System.currentTimeMillis() - 10, ZkClientMonitor.AccessType.WRITE);
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, 
"WriteCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(currentStateName, 
"WriteCounter"), 1);
     Assert.assertEquals((long) _beanServer.getAttribute(currentStateName, 
"WriteBytesCounter"), 5);

Reply via email to