More refactors to deduplicate code between helix's ZkClient and raw ZkClient.
1) Merge all duplicated (and extended) code in helix.manager.zk.ZkClient into helix.manager.zk.zookeeper.ZkClient. 2) Keep helix.manager.zk.ZkClient as a simple wrapper with all constructors and builder. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/310d4766 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/310d4766 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/310d4766 Branch: refs/heads/master Commit: 310d47660b2c31b569ddfb98058ce5877441095f Parents: 5ffab62 Author: Lei Xia <[email protected]> Authored: Tue Dec 19 10:22:37 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:32:57 2018 -0800 ---------------------------------------------------------------------- .../helix/manager/zk/ZkAsyncCallbacks.java | 6 +- .../org/apache/helix/manager/zk/ZkClient.java | 518 ++---------------- .../helix/manager/zk/zookeeper/ZkClient.java | 520 ++++++++++++++----- .../monitoring/mbeans/ZkClientMonitor.java | 40 +- .../monitoring/mbeans/TestZkClientMonitor.java | 10 +- 5 files changed, 454 insertions(+), 640 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java index 3fd0d0f..bfdf7bb 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkAsyncCallbacks.java @@ -136,9 +136,11 @@ public class ZkAsyncCallbacks { ZkAsyncCallContext zkCtx = (ZkAsyncCallContext) ctx; if (zkCtx._monitor != null) { if (zkCtx._isRead) { - zkCtx._monitor.recordRead(path, zkCtx._bytes, zkCtx._startTimeMilliSec); + zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec, + ZkClientMonitor.AccessType.READ); } else { - zkCtx._monitor.recordWrite(path, zkCtx._bytes, zkCtx._startTimeMilliSec); + zkCtx._monitor.record(path, zkCtx._bytes, zkCtx._startTimeMilliSec, + ZkClientMonitor.AccessType.WRITE); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java index 182c77e..618a003 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java @@ -21,47 +21,51 @@ package org.apache.helix.manager.zk; import org.I0Itec.zkclient.IZkConnection; import org.I0Itec.zkclient.ZkConnection; -import org.I0Itec.zkclient.exception.ZkException; -import org.I0Itec.zkclient.exception.ZkInterruptedException; -import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZkAsyncCallbacks.*; -import org.apache.helix.monitoring.mbeans.ZkClientMonitor; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.JMException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; /** - * ZKClient does not provide some functionalities, this will be used for quick fixes if - * any bug found in ZKClient or if we need additional features but can't wait for the new - * ZkClient jar Ideally we should commit the changes we do here to ZKClient. + * This is a wrapper of {@link org.apache.helix.manager.zk.zookeeper.ZkClient}, + * with additional constructors and builder. + * + * // TODO: we will need to merge two ZkClient into just one class. */ - public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient { private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000; public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000; - private PathBasedZkSerializer _zkSerializer; - private ZkClientMonitor _monitor; - - private ZkClient(IZkConnection connection, int connectionTimeout, long operationRetryTimeout, + /** + * + * @param zkConnection + * The Zookeeper connection + * @param connectionTimeout + * The connection timeout in milli seconds + * @param zkSerializer + * The Zookeeper data serializer + * @param operationRetryTimeout + * Most operations are retried in cases like connection loss with the Zookeeper servers. During such failures, this + * <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each + * operation is retried. A value lesser than 0 is considered as + * "retry forever until a connection has been reestablished". + * @param monitorType + * @param monitorKey + * @param monitorInstanceName + * These 3 inputs are used to name JMX monitor bean name for this ZkClient. + * The JMX bean name will be: HelixZkClient.monitorType.monitorKey.monitorInstanceName. + * @param monitorRootPathOnly + * Should only stat of access to root path be reported to JMX bean or path-specific stat be reported too. + */ + public ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly) { - super(connection, connectionTimeout, new ByteArraySerializer(), operationRetryTimeout); - init(zkSerializer, monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); + super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType, + monitorKey, monitorInstanceName, monitorRootPathOnly); } public ZkClient(IZkConnection connection, int connectionTimeout, @@ -128,466 +132,16 @@ public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient { this(zkServers, null, null); } - protected void init(PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, - String monitorInstanceName, boolean monitorRootPathOnly) { - _zkSerializer = zkSerializer; - if (LOG.isTraceEnabled()) { - StackTraceElement[] calls = Thread.currentThread().getStackTrace(); - LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls)); - } - try { - if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType - .isEmpty()) { - _monitor = - new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); - } else { - LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); - } - } catch (JMException e) { - LOG.error("Error in creating ZkClientMonitor", e); - } - } - - @Override - public void setZkSerializer(ZkSerializer zkSerializer) { - _zkSerializer = new BasicZkSerializer(zkSerializer); - } - - public void setZkSerializer(PathBasedZkSerializer zkSerializer) { - _zkSerializer = zkSerializer; - } - - public PathBasedZkSerializer getZkSerializer() { - return _zkSerializer; - } - - public IZkConnection getConnection() { - return _connection; - } - - @Override - public void close() throws ZkInterruptedException { - if (LOG.isTraceEnabled()) { - StackTraceElement[] calls = Thread.currentThread().getStackTrace(); - LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls)); - } - getEventLock().lock(); - try { - if (_connection == null) { - return; - } - LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper()); - super.close(); - } catch (ZkInterruptedException e) { - /** - * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will - * throw ZkInterruptedException and skip ZkConnection#close() - */ - if (_connection != null) { - try { - /** - * ZkInterruptedException#construct() honors InterruptedException by calling - * Thread.currentThread().interrupt(); clear it first, so we can safely close the - * zk-connection - */ - Thread.interrupted(); - _connection.close(); - /** - * restore interrupted status of current thread - */ - Thread.currentThread().interrupt(); - } catch (InterruptedException e1) { - throw new ZkInterruptedException(e1); - } - } - } finally { - getEventLock().unlock(); - if (_monitor != null) { - _monitor.unregister(); - } - LOG.info("Closed zkclient"); - } - } - - public boolean isClosed() { - return (_connection == null || !_connection.getZookeeperState().isAlive()); - } - - public Stat getStat(final String path) { - long startT = System.currentTimeMillis(); - try { - Stat stat = retryUntilConnected(new Callable<Stat>() { - - @Override - public Stat call() throws Exception { - Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false); - return stat; - } - }); - recordRead(path, null, startT); - return stat; - } catch (Exception e) { - recordReadFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - // override exists(path, watch), so we can record all exists requests - @Override - protected boolean exists(final String path, final boolean watch) { - long startT = System.currentTimeMillis(); - try { - boolean exists = retryUntilConnected(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return _connection.exists(path, watch); - } - }); - recordRead(path, null, startT); - return exists; - } catch (Exception e) { - recordReadFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - // override getChildren(path, watch), so we can record all getChildren requests - @Override - protected List<String> getChildren(final String path, final boolean watch) { - long startT = System.currentTimeMillis(); - try { - List<String> children = retryUntilConnected(new Callable<List<String>>() { - @Override - public List<String> call() throws Exception { - return _connection.getChildren(path, watch); - } - }); - recordRead(path, null, startT); - return children; - } catch (Exception e) { - recordReadFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("getChildren, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - @SuppressWarnings("unchecked") - public <T extends Object> T deserialize(byte[] data, String path) { - if (data == null) { - return null; - } - return (T) _zkSerializer.deserialize(data, path); - } - - // override readData(path, stat, watch), so we can record all read requests - @Override - @SuppressWarnings("unchecked") - protected <T extends Object> T readData(final String path, final Stat stat, final boolean watch) { - long startT = System.currentTimeMillis(); - byte[] data = null; - try { - data = retryUntilConnected(new Callable<byte[]>() { - - @Override - public byte[] call() throws Exception { - return _connection.readData(path, stat, watch); - } - }); - recordRead(path, data, startT); - return (T) deserialize(data, path); - } catch (Exception e) { - recordReadFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("getData, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - @SuppressWarnings("unchecked") - public <T extends Object> T readDataAndStat(String path, Stat stat, - boolean returnNullIfPathNotExists) { - T data = null; - try { - data = readData(path, stat); - } catch (ZkNoNodeException e) { - if (!returnNullIfPathNotExists) { - throw e; - } - } - return data; - } - - public String getServers() { - return _connection.getServers(); - } - - public byte[] serialize(Object data, String path) { - return _zkSerializer.serialize(data, path); - } - - @Override - public void writeData(final String path, Object datat, final int expectedVersion) { - long startT = System.currentTimeMillis(); - try { - final byte[] data = serialize(datat, path); - checkDataSizeLimit(data); - retryUntilConnected(new Callable<Object>() { - - @Override public Object call() throws Exception { - _connection.writeData(path, data, expectedVersion); - return null; - } - }); - recordWrite(path, data, startT); - } catch (Exception e) { - recordWriteFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) - throws InterruptedException { - long startT = System.currentTimeMillis(); - try { - final byte[] data = _zkSerializer.serialize(datat, path); - checkDataSizeLimit(data); - Stat stat = retryUntilConnected(new Callable<Stat>() { - - @Override public Stat call() throws Exception { - return ((ZkConnection) _connection).getZookeeper() - .setData(path, data, expectedVersion); - } - }); - recordWrite(path, data, startT); - return stat; - } catch (Exception e) { - recordWriteFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - @Override - public String create(final String path, Object datat, final CreateMode mode) - throws IllegalArgumentException, ZkException { - if (path == null) { - throw new NullPointerException("path must not be null."); - } - long startT = System.currentTimeMillis(); - try { - final byte[] data = datat == null ? null : serialize(datat, path); - checkDataSizeLimit(data); - String actualPath = retryUntilConnected(new Callable<String>() { - @Override - public String call() throws Exception { - return _connection.create(path, data, mode); - } - }); - recordWrite(path, data, startT); - return actualPath; - } catch (Exception e) { - recordWriteFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("create, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - } - - @Override - public boolean delete(final String path) { - long startT = System.currentTimeMillis(); - boolean isDeleted; - try { - try { - retryUntilConnected(new Callable<Object>() { - - @Override - public Object call() throws Exception { - _connection.delete(path); - return null; - } - }); - isDeleted = true; - } catch (ZkNoNodeException e) { - isDeleted = false; - LOG.error("Failed to delete path " + path + ", znode does not exist!"); - } - recordWrite(path, null, startT); - } catch (Exception e) { - recordWriteFailure(path); - throw e; - } finally { - long endT = System.currentTimeMillis(); - if (LOG.isTraceEnabled()) { - LOG.trace("delete, path: " + path + ", time: " + (endT - startT) + " ms"); - } - } - return isDeleted; - } - - public void asyncCreate(final String path, Object datat, final CreateMode mode, - final CreateCallbackHandler cb) { - final long startT = System.currentTimeMillis(); - final byte[] data = (datat == null ? null : serialize(datat, path)); - retryUntilConnected(new Callable<Object>() { - @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().create(path, data, Ids.OPEN_ACL_UNSAFE, - // Arrays.asList(DEFAULT_ACL), - mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, - data == null ? 0 : data.length, false)); - return null; - } - }); - } - - public void asyncSetData(final String path, Object datat, final int version, - final SetDataCallbackHandler cb) { - final long startT = System.currentTimeMillis(); - final byte[] data = serialize(datat, path); - retryUntilConnected(new Callable<Object>() { - @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, - new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, - data == null ? 0 : data.length, false)); - return null; - } - }); - } - - public void asyncGetData(final String path, final GetDataCallbackHandler cb) { - final long startT = System.currentTimeMillis(); - retryUntilConnected(new Callable<Object>() { - @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, - new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); - return null; - } - }); - } - - public void asyncExists(final String path, final ExistsCallbackHandler cb) { - final long startT = System.currentTimeMillis(); - retryUntilConnected(new Callable<Object>() { - @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, - new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); - return null; - } - }); - } - - public void asyncDelete(final String path, final DeleteCallbackHandler cb) { - final long startT = System.currentTimeMillis(); - retryUntilConnected(new Callable<Object>() { - @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, - new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false)); - return null; - } - }); - } - - public <T> T retryUntilConnected(final Callable<T> callable) { - final ZkConnection zkConnection = (ZkConnection) getConnection(); - return super.retryUntilConnected(new Callable<T>() { - @Override - public T call() throws Exception { - // Validate that the connection is not null before trigger callback - if (zkConnection == null || zkConnection.getZookeeper() == null) { - throw new IllegalStateException( - "ZkConnection is in invalid state! Please close this ZkClient and create new client."); - } - return callable.call(); - } - }); - } - - private void checkDataSizeLimit(byte[] data) { - if (data != null && data.length > ZNRecord.SIZE_LIMIT) { - LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): " - + new String(data).substring(0, 1024)); - throw new HelixException("Data size larger than 1M"); - } - } - - @Override public void process(WatchedEvent event) { - boolean stateChanged = event.getPath() == null; - boolean dataChanged = event.getType() == Event.EventType.NodeDataChanged - || event.getType() == Event.EventType.NodeDeleted - || event.getType() == Event.EventType.NodeCreated - || event.getType() == Event.EventType.NodeChildrenChanged; - - if (_monitor != null) { - if (stateChanged) { - _monitor.increaseStateChangeEventCounter(); - } - if (dataChanged) { - _monitor.increaseDataChangeEventCounter(); - } - } - - super.process(event); - } - - private void recordRead(String path, byte[] data, long startTimeMilliSec) { - if (_monitor != null) { - int dataSize = 0; - if (data != null) { - dataSize = data.length; - } - _monitor.recordRead(path, dataSize, startTimeMilliSec); - } + public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, + final ZkSerializer zkSerializer, final long operationRetryTimeout) { + this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer, + operationRetryTimeout); } - private void recordWrite(String path, byte[] data, long startTimeMilliSec) { - if (_monitor != null) { - int dataSize = 0; - if (data != null) { - dataSize = data.length; - } - _monitor.recordWrite(path, dataSize, startTimeMilliSec); - } - } - - private void recordReadFailure(String path) { - if (_monitor != null) { - _monitor.recordReadFailure(path); - } - } - - private void recordWriteFailure(String path) { - if (_monitor != null) { - _monitor.recordWriteFailure(path); - } + public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, + final ZkSerializer zkSerializer, final long operationRetryTimeout) { + this(zkConnection, connectionTimeout, operationRetryTimeout, + new BasicZkSerializer(zkSerializer), null, null, null, false); } public static class Builder { http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index d26a274..4748d6e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -10,6 +10,7 @@ */ package org.apache.helix.manager.zk.zookeeper; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; @@ -19,8 +20,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; - +import javax.management.JMException; import org.I0Itec.zkclient.DataUpdater; +import org.I0Itec.zkclient.ExceptionUtil; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkConnection; import org.I0Itec.zkclient.IZkDataListener; @@ -33,9 +35,15 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.I0Itec.zkclient.exception.ZkTimeoutException; -import org.I0Itec.zkclient.ExceptionUtil; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.BasicZkSerializer; +import org.apache.helix.manager.zk.PathBasedZkSerializer; +import org.apache.helix.manager.zk.ZkAsyncCallbacks; +import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent; +import org.apache.helix.monitoring.mbeans.ZkClientMonitor; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -49,12 +57,12 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper + * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper. + * WARN: Do not use this class directly, use {@link org.apache.helix.manager.zk.ZkClient} instead. */ public class ZkClient implements Watcher { private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); @@ -62,106 +70,51 @@ public class ZkClient implements Watcher { protected final IZkConnection _connection; protected final long operationRetryTimeoutInMillis; private final Map<String, Set<IZkChildListener>> _childListener = - new ConcurrentHashMap<String, Set<IZkChildListener>>(); + new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = - new ConcurrentHashMap<String, Set<IZkDataListener>>(); - private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>(); + new ConcurrentHashMap<>(); + private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<>(); private KeeperState _currentState; private final ZkLock _zkEventLock = new ZkLock(); private boolean _shutdownTriggered; private ZkEventThread _eventThread; // TODO PVo remove this later private Thread _zookeeperEventThread; - private ZkSerializer _zkSerializer; private volatile boolean _closed; + private PathBasedZkSerializer _pathBasedZkSerializer; + private ZkClientMonitor _monitor; - public ZkClient(String serverstring) { - this(serverstring, Integer.MAX_VALUE); - } - - public ZkClient(String zkServers, int connectionTimeout) { - this(new ZkConnection(zkServers), connectionTimeout); - } - - public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) { - this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout); - } - - public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, - ZkSerializer zkSerializer) { - this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer); - } - - /** - * - * @param zkServers - * The Zookeeper servers - * @param sessionTimeout - * The session timeout in milli seconds - * @param connectionTimeout - * The connection timeout in milli seconds - * @param zkSerializer - * The Zookeeper data serializer - * @param operationRetryTimeout - * Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like - * connection loss with the Zookeeper servers. During such failures, this - * <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each - * operation is retried. A value lesser than 0 is considered as - * "retry forever until a connection has been reestablished". - */ - public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, - final ZkSerializer zkSerializer, final long operationRetryTimeout) { - this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer, - operationRetryTimeout); - } - - public ZkClient(IZkConnection connection) { - this(connection, Integer.MAX_VALUE); - } - - public ZkClient(IZkConnection connection, int connectionTimeout) { - this(connection, connectionTimeout, new SerializableSerializer()); - } - public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) { - this(zkConnection, connectionTimeout, zkSerializer, -1); - } - - /** - * - * @param zkConnection - * The Zookeeper servers - * @param connectionTimeout - * The connection timeout in milli seconds - * @param zkSerializer - * The Zookeeper data serializer - * @param operationRetryTimeout - * Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like - * connection loss with the Zookeeper servers. During such failures, this - * <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each - * operation is retried. A value lesser than 0 is considered as - * "retry forever until a connection has been reestablished". - */ - public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, - final ZkSerializer zkSerializer, final long operationRetryTimeout) { + protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, + PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, + String monitorInstanceName, boolean monitorRootPathOnly) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } _connection = zkConnection; - _zkSerializer = zkSerializer; + _pathBasedZkSerializer = zkSerializer; this.operationRetryTimeoutInMillis = operationRetryTimeout; connect(connectionTimeout, this); - } - public void setZkSerializer(ZkSerializer zkSerializer) { - _zkSerializer = zkSerializer; + // initiate monitor + try { + if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType + .isEmpty()) { + _monitor = + new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); + } else { + LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); + } + } catch (JMException e) { + LOG.error("Error in creating ZkClientMonitor", e); + } } public List<String> subscribeChildChanges(String path, IZkChildListener listener) { synchronized (_childListener) { Set<IZkChildListener> listeners = _childListener.get(path); if (listeners == null) { - listeners = new CopyOnWriteArraySet<IZkChildListener>(); + listeners = new CopyOnWriteArraySet<>(); _childListener.put(path, listeners); } listeners.add(listener); @@ -183,7 +136,7 @@ public class ZkClient implements Watcher { synchronized (_dataListener) { listeners = _dataListener.get(path); if (listeners == null) { - listeners = new CopyOnWriteArraySet<IZkDataListener>(); + listeners = new CopyOnWriteArraySet<>(); _dataListener.put(path, listeners); } listeners.add(listener); @@ -446,7 +399,7 @@ public class ZkClient implements Watcher { * Create a node with ACL. * * @param path - * @param data + * @param datat * @param acl * @param mode * @return create node's path @@ -459,21 +412,35 @@ public class ZkClient implements Watcher { * @throws RuntimeException * if any other exception occurs */ - public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) { + public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode) + throws IllegalArgumentException, ZkException { if (path == null) { - throw new NullPointerException("Missing value for path"); + throw new NullPointerException("Path must not be null."); } if (acl == null || acl.size() == 0) { throw new NullPointerException("Missing value for ACL"); } - final byte[] bytes = data == null ? null : serialize(data); - - return retryUntilConnected(new Callable<String>() { - @Override public String call() throws Exception { - return _connection.create(path, bytes, acl, mode); + long startT = System.currentTimeMillis(); + try { + final byte[] data = datat == null ? null : serialize(datat, path); + checkDataSizeLimit(data); + String actualPath = retryUntilConnected(new Callable<String>() { + @Override + public String call() throws Exception { + return _connection.create(path, data, acl, mode); + } + }); + record(path, data, startT, ZkClientMonitor.AccessType.WRITE); + return actualPath; + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.WRITE); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("create, path: " + path + ", time: " + (endT - startT) + " ms"); } - }); - + } } /** @@ -556,16 +523,17 @@ public class ZkClient implements Watcher { return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); } - @Override public void process(WatchedEvent event) { + @Override + public void process(WatchedEvent event) { LOG.debug("Received event: " + event); _zookeeperEventThread = Thread.currentThread(); boolean stateChanged = event.getPath() == null; boolean znodeChanged = event.getPath() != null; - boolean dataChanged = - event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted - || event.getType() == EventType.NodeCreated - || event.getType() == EventType.NodeChildrenChanged; + boolean dataChanged = event.getType() == Event.EventType.NodeDataChanged + || event.getType() == Event.EventType.NodeDeleted + || event.getType() == Event.EventType.NodeCreated + || event.getType() == Event.EventType.NodeChildrenChanged; getEventLock().lock(); try { @@ -604,6 +572,10 @@ public class ZkClient implements Watcher { getEventLock().getDataChangedCondition().signalAll(); } getEventLock().unlock(); + + // update state change counter. + recordStateChange(stateChanged, dataChanged); + LOG.debug("Leaving process event"); } } @@ -622,13 +594,28 @@ public class ZkClient implements Watcher { } protected List<String> getChildren(final String path, final boolean watch) { - return retryUntilConnected(new Callable<List<String>>() { - @Override public List<String> call() throws Exception { - return _connection.getChildren(path, watch); + long startT = System.currentTimeMillis(); + try { + List<String> children = retryUntilConnected(new Callable<List<String>>() { + @Override + public List<String> call() throws Exception { + return _connection.getChildren(path, watch); + } + }); + record(path, null, startT, ZkClientMonitor.AccessType.READ); + return children; + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.READ); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("getChildren, path: " + path + ", time: " + (endT - startT) + " ms"); } - }); + } } + /** * Counts number of children for the given path. * @@ -643,16 +630,54 @@ public class ZkClient implements Watcher { } } + public boolean exists(final String path) { + return exists(path, hasListeners(path)); + } + + protected boolean exists(final String path, final boolean watch) { - return retryUntilConnected(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - return _connection.exists(path, watch); + long startT = System.currentTimeMillis(); + try { + boolean exists = retryUntilConnected(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return _connection.exists(path, watch); + } + }); + record(path, null, startT, ZkClientMonitor.AccessType.READ); + return exists; + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.READ); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " ms"); } - }); + } } - public boolean exists(final String path) { - return exists(path, hasListeners(path)); + public Stat getStat(final String path) { + long startT = System.currentTimeMillis(); + try { + Stat stat = retryUntilConnected(new Callable<Stat>() { + @Override + public Stat call() throws Exception { + Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false); + return stat; + } + }); + record(path, null, startT, ZkClientMonitor.AccessType.READ); + return stat; + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.READ); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("exists, path: " + path + ", time: " + (endT - startT) + " ms"); + } + } } private void processStateChanged(WatchedEvent event) { @@ -826,6 +851,10 @@ public class ZkClient implements Watcher { return _dataListener.get(path); } + public IZkConnection getConnection() { + return _connection; + } + public void waitUntilConnected() throws ZkInterruptedException { waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } @@ -882,8 +911,8 @@ public class ZkClient implements Watcher { * @throws RuntimeException * if any other exception occurs from invoking the Callable */ - public <T> T retryUntilConnected(Callable<T> callable) - throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + public <T> T retryUntilConnected(final Callable<T> callable) + throws IllegalArgumentException, ZkException { if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); } @@ -893,6 +922,12 @@ public class ZkClient implements Watcher { throw new IllegalStateException("ZkClient already closed!"); } try { + final ZkConnection zkConnection = (ZkConnection) getConnection(); + // Validate that the connection is not null before trigger callback + if (zkConnection == null || zkConnection.getZookeeper() == null) { + throw new IllegalStateException( + "ZkConnection is in invalid state! Please close this ZkClient and create new client."); + } return callable.call(); } catch (ConnectionLossException e) { // we give the event thread some time to update the status to 'Disconnected' @@ -948,30 +983,58 @@ public class ZkClient implements Watcher { } public boolean delete(final String path) { + long startT = System.currentTimeMillis(); + boolean success; try { - retryUntilConnected(new Callable<Object>() { - - @Override public Object call() throws Exception { - _connection.delete(path); - return null; - } - }); + try { + retryUntilConnected(new Callable<Object>() { - return true; - } catch (ZkNoNodeException e) { - return false; + @Override + public Object call() throws Exception { + _connection.delete(path); + return null; + } + }); + success = true; + } catch (ZkNoNodeException e) { + success = false; + LOG.warn("Failed to delete path " + path + ", znode does not exist!"); + } + record(path, null, startT, ZkClientMonitor.AccessType.WRITE); + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.WRITE); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("delete, path: " + path + ", time: " + (endT - startT) + " ms"); + } } + return success; + } + + public void setZkSerializer(ZkSerializer zkSerializer) { + _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer); + } + + public void setZkSerializer(PathBasedZkSerializer zkSerializer) { + _pathBasedZkSerializer = zkSerializer; } - private byte[] serialize(Object data) { - return _zkSerializer.serialize(data); + public PathBasedZkSerializer getZkSerializer() { + return _pathBasedZkSerializer; } - @SuppressWarnings("unchecked") private <T extends Object> T derializable(byte[] data) { + public byte[] serialize(Object data, String path) { + return _pathBasedZkSerializer.serialize(data, path); + } + + @SuppressWarnings("unchecked") + public <T extends Object> T deserialize(byte[] data, String path) { if (data == null) { return null; } - return (T) _zkSerializer.deserialize(data); + return (T) _pathBasedZkSerializer.deserialize(data, path); } @SuppressWarnings("unchecked") public <T extends Object> T readData(String path) { @@ -991,19 +1054,47 @@ public class ZkClient implements Watcher { return data; } - @SuppressWarnings("unchecked") public <T extends Object> T readData(String path, Stat stat) { + @SuppressWarnings("unchecked") + public <T extends Object> T readData(String path, Stat stat) { return (T) readData(path, stat, hasListeners(path)); } - @SuppressWarnings("unchecked") protected <T extends Object> T readData(final String path, - final Stat stat, final boolean watch) { - byte[] data = retryUntilConnected(new Callable<byte[]>() { + @SuppressWarnings("unchecked") + public <T extends Object> T readData(final String path, final Stat stat, final boolean watch) { + long startT = System.currentTimeMillis(); + byte[] data = null; + try { + data = retryUntilConnected(new Callable<byte[]>() { - @Override public byte[] call() throws Exception { - return _connection.readData(path, stat, watch); + @Override public byte[] call() throws Exception { + return _connection.readData(path, stat, watch); + } + }); + record(path, data, startT, ZkClientMonitor.AccessType.READ); + return (T) deserialize(data, path); + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.READ); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("getData, path: " + path + ", time: " + (endT - startT) + " ms"); } - }); - return (T) derializable(data); + } + } + + @SuppressWarnings("unchecked") + public <T extends Object> T readDataAndStat(String path, Stat stat, + boolean returnNullIfPathNotExists) { + T data = null; + try { + data = readData(path, stat); + } catch (ZkNoNodeException e) { + if (!returnNullIfPathNotExists) { + throw e; + } + } + return data; } public void writeData(String path, Object object) { @@ -1043,16 +1134,104 @@ public class ZkClient implements Watcher { } public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) { - final byte[] data = serialize(datat); - return (Stat) retryUntilConnected(new Callable<Object>() { + long startT = System.currentTimeMillis(); + try { + final byte[] data = serialize(datat, path); + checkDataSizeLimit(data); + final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + return _connection.writeDataReturnStat(path, data, expectedVersion); + } + }); + record(path, data, startT, ZkClientMonitor.AccessType.WRITE); + return stat; + } catch (Exception e) { + recordFailure(path, ZkClientMonitor.AccessType.WRITE); + throw e; + } finally { + long endT = System.currentTimeMillis(); + if (LOG.isTraceEnabled()) { + LOG.trace("setData, path: " + path + ", time: " + (endT - startT) + " ms"); + } + } + } + + public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) { + return writeDataReturnStat(path, datat, expectedVersion); + } + + public void asyncCreate(final String path, Object datat, final CreateMode mode, + final ZkAsyncCallbacks.CreateCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + final byte[] data = (datat == null ? null : serialize(datat, path)); + retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - Stat stat = _connection.writeDataReturnStat(path, data, expectedVersion); - return stat; + ((ZkConnection) _connection).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, + // Arrays.asList(DEFAULT_ACL), + mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + data == null ? 0 : data.length, false)); + return null; } }); } + // Async Data Accessors + public void asyncSetData(final String path, Object datat, final int version, + final ZkAsyncCallbacks.SetDataCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + final byte[] data = serialize(datat, path); + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + data == null ? 0 : data.length, false)); + return null; + } + }); + } + + public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); + return null; + } + }); + } + + public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); + return null; + } + }); + } + + public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb) { + final long startT = System.currentTimeMillis(); + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false)); + return null; + } + }); + } + + private void checkDataSizeLimit(byte[] data) { + if (data != null && data.length > ZNRecord.SIZE_LIMIT) { + LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): " + + new String(data).substring(0, 1024)); + throw new HelixException("Data size larger than 1M"); + } + } + public void watchForData(final String path) { retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { @@ -1153,29 +1332,64 @@ public class ZkClient implements Watcher { } } + public String getServers() { + return _connection.getServers(); + } + /** * Close the client. * * @throws ZkInterruptedException */ public void close() throws ZkInterruptedException { - if (_closed) { - return; + if (LOG.isTraceEnabled()) { + StackTraceElement[] calls = Thread.currentThread().getStackTrace(); + LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls)); } - LOG.debug("Closing ZkClient..."); getEventLock().lock(); try { + if (_connection == null || _closed) { + return; + } + LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper()); setShutdownTrigger(true); _eventThread.interrupt(); _eventThread.join(2000); _connection.close(); _closed = true; } catch (InterruptedException e) { - throw new ZkInterruptedException(e); + /** + * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will + * throw ZkInterruptedException and skip ZkConnection#close() + */ + if (_connection != null) { + try { + /** + * ZkInterruptedException#construct() honors InterruptedException by calling + * Thread.currentThread().interrupt(); clear it first, so we can safely close the + * zk-connection + */ + Thread.interrupted(); + _connection.close(); + /** + * restore interrupted status of current thread + */ + Thread.currentThread().interrupt(); + } catch (InterruptedException e1) { + throw new ZkInterruptedException(e1); + } + } } finally { getEventLock().unlock(); + if (_monitor != null) { + _monitor.unregister(); + } + LOG.info("Closed zkclient"); } - LOG.debug("Closing ZkClient...done"); + } + + public boolean isClosed() { + return (_connection == null || !_connection.getZookeeperState().isAlive()); } private void reconnect() { @@ -1223,4 +1437,30 @@ public class ZkClient implements Watcher { } }); } + + // operations to update monitor's counters + private void record(String path, byte[] data, long startTimeMilliSec, ZkClientMonitor.AccessType accessType) { + if (_monitor != null) { + int dataSize = (data != null) ? data.length : 0; + _monitor.record(path, dataSize, startTimeMilliSec, accessType); + } + } + + private void recordFailure(String path, ZkClientMonitor.AccessType accessType) { + if (_monitor != null) { + _monitor.recordFailure(path, accessType); + } + } + + private void recordStateChange(boolean stateChanged, boolean dataChanged) { + // update state change counter. + if (_monitor != null) { + if (stateChanged) { + _monitor.increaseStateChangeEventCounter(); + } + if (dataChanged) { + _monitor.increaseDataChangeEventCounter(); + } + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java index 639fd8a..6cdf6e7 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java @@ -31,6 +31,11 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { public static final String MONITOR_TYPE = "Type"; public static final String MONITOR_KEY = "Key"; + public enum AccessType { + READ, + WRITE + } + private ObjectName _objectName; private String _sensorName; @@ -117,19 +122,30 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { } } - public void recordReadFailure(String path) { - record(path, 0, 0, true, true); - } - - public void recordRead(String path, int dataSize, long startTimeMilliSec) { - record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, true); - } - - public void recordWriteFailure(String path) { - record(path, 0, 0, true, false); + public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) { + switch (accessType) { + case READ: + record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, true); + return; + case WRITE: + record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, false); + return; + + default: + return; + } } - public void recordWrite(String path, int dataSize, long startTimeMilliSec) { - record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, false); + public void recordFailure(String path, AccessType accessType) { + switch (accessType) { + case READ: + record(path, 0, 0, true, true); + return; + case WRITE: + record(path, 0, 0, true, false); + return; + default: + return; + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/310d4766/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java index 8bf136e..1b8099c 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java @@ -98,18 +98,20 @@ public class TestZkClientMonitor { long eventCount = (long) _beanServer.getAttribute(name, "DataChangeEventCounter"); Assert.assertEquals(eventCount, 1); - monitor.recordRead("TEST/IDEALSTATES/myResource", 0, System.currentTimeMillis() - 10); + monitor.record("TEST/IDEALSTATES/myResource", 0, System.currentTimeMillis() - 10, + ZkClientMonitor.AccessType.READ); Assert.assertEquals((long) _beanServer.getAttribute(rootName, "ReadCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, "ReadCounter"), 1); Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadLatencyGauge.Max") >= 10); - monitor.recordRead("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() - 15); + monitor.record("TEST/INSTANCES/testDB0", 0, System.currentTimeMillis() - 15, + ZkClientMonitor.AccessType.READ); Assert.assertEquals((long) _beanServer.getAttribute(rootName, "ReadCounter"), 2); Assert.assertEquals((long) _beanServer.getAttribute(instancesName, "ReadCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(idealStateName, "ReadCounter"), 1); Assert.assertTrue((long) _beanServer.getAttribute(rootName, "ReadTotalLatencyCounter") >= 25); - monitor.recordWrite("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5, - System.currentTimeMillis() - 10); + monitor.record("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5, + System.currentTimeMillis() - 10, ZkClientMonitor.AccessType.WRITE); Assert.assertEquals((long) _beanServer.getAttribute(rootName, "WriteCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(currentStateName, "WriteCounter"), 1); Assert.assertEquals((long) _beanServer.getAttribute(currentStateName, "WriteBytesCounter"), 5);
