Add monitor to ZkClient to monitor the pending callbacks.

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fde1a6a4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fde1a6a4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fde1a6a4

Branch: refs/heads/master
Commit: fde1a6a441a1fcfac6a8c6f1849d54bb34291643
Parents: b3ecd2a
Author: Lei Xia <[email protected]>
Authored: Thu Dec 21 10:19:32 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:33:20 2018 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/zookeeper/ZkClient.java  |  2 +-
 .../helix/manager/zk/zookeeper/ZkEventThread.java    |  8 ++++++--
 .../helix/monitoring/mbeans/ZkClientMonitor.java     | 15 +++++++++++++++
 .../monitoring/mbeans/ZkClientMonitorMBean.java      |  1 +
 4 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 4748d6e..8e0a379 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -35,7 +35,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
@@ -102,6 +101,7 @@ public class ZkClient implements Watcher {
           .isEmpty()) {
         _monitor =
             new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, 
monitorRootPathOnly);
+        _monitor.setZkEventThread(_eventThread);
       } else {
         LOG.info("ZkClient monitor key or type is not provided. Skip 
monitoring.");
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
index dcf7019..968f7f3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
@@ -26,10 +26,10 @@ import org.slf4j.LoggerFactory;
  * it is waiting for something). {@link ZkClient} would then for instance not 
be able to maintain it's connection state
  * anymore.
  */
-class ZkEventThread extends Thread {
+public class ZkEventThread extends Thread {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
 
-  private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>();
+  private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<>();
 
   private static AtomicInteger _eventId = new AtomicInteger(0);
 
@@ -82,4 +82,8 @@ class ZkEventThread extends Thread {
       _events.add(event);
     }
   }
+
+  public int getPendingEventsCount() {
+    return _events.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index 6cdf6e7..3e4022f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -26,6 +26,7 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
 
 public class ZkClientMonitor implements ZkClientMonitorMBean {
   public static final String MONITOR_TYPE = "Type";
@@ -41,6 +42,7 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
 
   private long _stateChangeEventCounter;
   private long _dataChangeEventCounter;
+  private ZkEventThread _zkEventThread;
 
   private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> 
_zkClientPathMonitorMap =
       new ConcurrentHashMap<>();
@@ -68,6 +70,10 @@ public class ZkClientMonitor implements ZkClientMonitorMBean 
{
     }
   }
 
+  public void setZkEventThread(ZkEventThread zkEventThread) {
+    _zkEventThread = zkEventThread;
+  }
+
   protected static ObjectName getObjectName(String monitorType, String 
monitorKey,
       String monitorInstanceName) throws MalformedObjectNameException {
     return MBeanRegistrar
@@ -109,6 +115,15 @@ public class ZkClientMonitor implements 
ZkClientMonitorMBean {
     return _dataChangeEventCounter;
   }
 
+  @Override
+  public long getPendingCallbackGauge() {
+    if (_zkEventThread != null) {
+      return _zkEventThread.getPendingEventsCount();
+    }
+
+    return -1;
+  }
+
   private void record(String path, int bytes, long latencyMilliSec, boolean 
isFailure,
       boolean isRead) {
     for (ZkClientPathMonitor.PredefinedPath predefinedPath : 
ZkClientPathMonitor.PredefinedPath

http://git-wip-us.apache.org/repos/asf/helix/blob/fde1a6a4/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
index a260f71..17bfa25 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
@@ -24,4 +24,5 @@ import org.apache.helix.monitoring.SensorNameProvider;
 public interface ZkClientMonitorMBean extends SensorNameProvider {
   long getStateChangeEventCounter();
   long getDataChangeEventCounter();
+  long getPendingCallbackGauge();
 }

Reply via email to