Add monitor to ZkClient to monitor the pending callbacks.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fde1a6a4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fde1a6a4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fde1a6a4 Branch: refs/heads/master Commit: fde1a6a441a1fcfac6a8c6f1849d54bb34291643 Parents: b3ecd2a Author: Lei Xia <[email protected]> Authored: Thu Dec 21 10:19:32 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:33:20 2018 -0800 ---------------------------------------------------------------------- .../apache/helix/manager/zk/zookeeper/ZkClient.java | 2 +- .../helix/manager/zk/zookeeper/ZkEventThread.java | 8 ++++++-- .../helix/monitoring/mbeans/ZkClientMonitor.java | 15 +++++++++++++++ .../monitoring/mbeans/ZkClientMonitorMBean.java | 1 + 4 files changed, 23 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index 4748d6e..8e0a379 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -35,7 +35,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.I0Itec.zkclient.exception.ZkTimeoutException; -import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; @@ -102,6 +101,7 @@ public class ZkClient implements Watcher { .isEmpty()) { _monitor = new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly); + _monitor.setZkEventThread(_eventThread); } else { LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); } http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java index dcf7019..968f7f3 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java @@ -26,10 +26,10 @@ import org.slf4j.LoggerFactory; * it is waiting for something). {@link ZkClient} would then for instance not be able to maintain it's connection state * anymore. */ -class ZkEventThread extends Thread { +public class ZkEventThread extends Thread { private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); - private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>(); + private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<>(); private static AtomicInteger _eventId = new AtomicInteger(0); @@ -82,4 +82,8 @@ class ZkEventThread extends Thread { _events.add(event); } } + + public int getPendingEventsCount() { + return _events.size(); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java index 6cdf6e7..3e4022f 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java @@ -26,6 +26,7 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.manager.zk.zookeeper.ZkEventThread; public class ZkClientMonitor implements ZkClientMonitorMBean { public static final String MONITOR_TYPE = "Type"; @@ -41,6 +42,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { private long _stateChangeEventCounter; private long _dataChangeEventCounter; + private ZkEventThread _zkEventThread; private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> _zkClientPathMonitorMap = new ConcurrentHashMap<>(); @@ -68,6 +70,10 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { } } + public void setZkEventThread(ZkEventThread zkEventThread) { + _zkEventThread = zkEventThread; + } + protected static ObjectName getObjectName(String monitorType, String monitorKey, String monitorInstanceName) throws MalformedObjectNameException { return MBeanRegistrar @@ -109,6 +115,15 @@ public class ZkClientMonitor implements ZkClientMonitorMBean { return _dataChangeEventCounter; } + @Override + public long getPendingCallbackGauge() { + if (_zkEventThread != null) { + return _zkEventThread.getPendingEventsCount(); + } + + return -1; + } + private void record(String path, int bytes, long latencyMilliSec, boolean isFailure, boolean isRead) { for (ZkClientPathMonitor.PredefinedPath predefinedPath : ZkClientPathMonitor.PredefinedPath http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java index a260f71..17bfa25 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java @@ -24,4 +24,5 @@ import org.apache.helix.monitoring.SensorNameProvider; public interface ZkClientMonitorMBean extends SensorNameProvider { long getStateChangeEventCounter(); long getDataChangeEventCounter(); + long getPendingCallbackGauge(); }
