Adding RoutingTableProvider monitor for tracking refresh actions and routing 
table update callbacks.

The monitor contains following metrics.
DataRefreshLatencyGauge
CallbackCounter
EventQueueSizeGauge
DataRefreshCounter

Also add tests for this monitor.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/993beb38
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/993beb38
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/993beb38

Branch: refs/heads/master
Commit: 993beb3834f4013de8d6d8221bd71ccdced93632
Parents: 3deeeab
Author: Jiajun Wang <[email protected]>
Authored: Thu Jul 12 10:33:21 2018 -0700
Committer: jiajunwang <[email protected]>
Committed: Tue Jul 17 11:57:12 2018 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/MonitorDomainNames.java   |   1 +
 .../mbeans/RoutingTableProviderMonitor.java     | 100 +++++++++++++++++++
 .../helix/spectator/RoutingTableProvider.java   |  28 +++++-
 ...TestRoutingTableProviderPeriodicRefresh.java |   4 +-
 .../mbeans/TestRoutingTableProviderMonitor.java | 100 +++++++++++++++++++
 5 files changed, 228 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index c28570d..73bf057 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -27,5 +27,6 @@ public enum MonitorDomainNames {
   HelixZkClient,
   HelixThreadPoolExecutor,
   HelixCallback,
+  RoutingTableProvider,
   CLMParticipantReport
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
new file mode 100644
index 0000000..1c64783
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import org.apache.helix.PropertyType;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class RoutingTableProviderMonitor extends DynamicMBeanProvider {
+  public static final String DATA_TYPE_KEY = "DataType";
+  public static final String CLUSTER_KEY = "Cluster";
+  public static final String DEFAULT = "DEFAULT";
+
+  private static final String MBEAN_DESCRIPTION = "Helix RoutingTableProvider 
Monitor";
+  private final String _sensorName;
+  private final PropertyType _propertyType;
+  private final String _clusterName;
+
+  private SimpleDynamicMetric<Long> _callbackCounter;
+  private SimpleDynamicMetric<Long> _eventQueueSizeGauge;
+  private SimpleDynamicMetric<Long> _dataRefreshCounter;
+  private HistogramDynamicMetric _dataRefreshLatencyGauge;
+
+  public RoutingTableProviderMonitor(final PropertyType propertyType, String 
clusterName) {
+    _propertyType = propertyType;
+    _clusterName = clusterName == null ? DEFAULT : clusterName;
+
+    // Don't put instanceName into sensor name. This detail information is in 
the MBean name already.
+    _sensorName = String
+        .format("%s.%s.%s", MonitorDomainNames.RoutingTableProvider.name(), 
_clusterName,
+            _propertyType.name());
+
+    _dataRefreshLatencyGauge = new 
HistogramDynamicMetric("DataRefreshLatencyGauge", new Histogram(
+        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, 
TimeUnit.MILLISECONDS)));
+    _callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l);
+    _eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l);
+    _dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l);
+  }
+
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
+
+  private ObjectName getMBeanName() throws MalformedObjectNameException {
+    return new ObjectName(String
+        .format("%s:%s=%s,%s=%s", 
MonitorDomainNames.RoutingTableProvider.name(), CLUSTER_KEY,
+            _clusterName, DATA_TYPE_KEY, _propertyType.name()));
+  }
+
+  public void increaseCallbackCounters(long currentQueueSize) {
+    _callbackCounter.updateValue(_callbackCounter.getValue() + 1);
+    _eventQueueSizeGauge.updateValue(currentQueueSize);
+  }
+
+  public void increaseDataRefreshCounters(long startTime) {
+    _dataRefreshCounter.updateValue(_dataRefreshCounter.getValue() + 1);
+    _dataRefreshLatencyGauge.updateValue(System.currentTimeMillis() - 
startTime);
+  }
+
+  @Override
+  public RoutingTableProviderMonitor register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_dataRefreshLatencyGauge);
+    attributeList.add(_callbackCounter);
+    attributeList.add(_eventQueueSizeGauge);
+    attributeList.add(_dataRefreshCounter);
+
+    doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName());
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 57b2fad..cc373db 100644
--- 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.management.JMException;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -52,6 +54,7 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +68,7 @@ public class RoutingTableProvider
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
   private final Map<RoutingTableChangeListener, ListenerContext> 
_routingTableChangeListenerMap;
+  private final RoutingTableProviderMonitor _monitor;
 
   // For periodic refresh
   private long _lastRefreshTimestamp;
@@ -101,6 +105,14 @@ public class RoutingTableProvider
     _sourceDataType = sourceDataType;
     _routingTableChangeListenerMap = new ConcurrentHashMap<>();
     String clusterName = _helixManager != null ? 
_helixManager.getClusterName() : null;
+
+    _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
+    try {
+      _monitor.register();
+    } catch (JMException e) {
+      logger.error("Failed to register RoutingTableProvider monitor MBean.", 
e);
+    }
+
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
 
@@ -189,6 +201,9 @@ public class RoutingTableProvider
       _periodicRefreshExecutor.shutdown();
     }
     _routerUpdater.shutdown();
+
+    _monitor.unregister();
+
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = 
_helixManager.getHelixDataAccessor().keyBuilder();
       switch (_sourceDataType) {
@@ -511,7 +526,7 @@ public class RoutingTableProvider
     _routingTableRef.set(newRoutingTable);
   }
 
-  public void refresh(List<ExternalView> externalViewList, NotificationContext 
changeContext) {
+  protected void refresh(List<ExternalView> externalViewList, 
NotificationContext changeContext) {
     HelixDataAccessor accessor = 
changeContext.getManager().getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -520,7 +535,7 @@ public class RoutingTableProvider
     refresh(externalViewList, configList, liveInstances);
   }
 
-  public void refresh(Collection<ExternalView> externalViews,
+  protected void refresh(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, 
instanceConfigs, liveInstances);
@@ -581,6 +596,9 @@ public class RoutingTableProvider
           logger.error(String.format("HelixManager is not connected for router 
update event: %s", event));
           throw new HelixException("HelixManager is not connected for router 
update event.");
         }
+
+        long startTime = System.currentTimeMillis();
+
         _dataCache.refresh(manager.getHelixDataAccessor());
         switch (_sourceDataType) {
           case EXTERNALVIEW:
@@ -599,6 +617,8 @@ public class RoutingTableProvider
             logger.warn("Unsupported source data type: {}, stop refreshing the 
routing table!",
                 _sourceDataType);
         }
+
+        _monitor.increaseDataRefreshCounters(startTime);
       }
     }
 
@@ -616,6 +636,8 @@ public class RoutingTableProvider
       event.addAttribute(AttributeName.helixmanager.name(), 
context.getManager());
       event.addAttribute(AttributeName.changeContext.name(), context);
       queueEvent(event);
+
+      _monitor.increaseCallbackCounters(_eventQueue.size());
     }
   }
 
@@ -630,4 +652,4 @@ public class RoutingTableProvider
       return _context;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
index c78b8e6..e2b6df1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -142,7 +142,7 @@ public class TestRoutingTableProviderPeriodicRefresh 
extends ZkTestBase {
     }
 
     @Override
-    public synchronized void refresh(List<ExternalView> externalViewList,
+    protected synchronized void refresh(List<ExternalView> externalViewList,
         NotificationContext changeContext) {
       super.refresh(externalViewList, changeContext);
       _refreshCount++;
@@ -152,7 +152,7 @@ public class TestRoutingTableProviderPeriodicRefresh 
extends ZkTestBase {
     }
 
     @Override
-    public synchronized void refresh(Collection<ExternalView> externalViews,
+    protected synchronized void refresh(Collection<ExternalView> externalViews,
         Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
       super.refresh(externalViews, instanceConfigs, liveInstances);
       _refreshCount++;

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
new file mode 100644
index 0000000..05240c1
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.PropertyType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestRoutingTableProviderMonitor {
+
+  private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+  private final String TEST_CLUSTER = "test_cluster";
+
+  private ObjectName buildObjectName(PropertyType type, String cluster)
+      throws MalformedObjectNameException {
+    return 
MBeanRegistrar.buildObjectName(MonitorDomainNames.RoutingTableProvider.name(),
+        RoutingTableProviderMonitor.CLUSTER_KEY, cluster, 
RoutingTableProviderMonitor.DATA_TYPE_KEY,
+        type.name());
+  }
+
+  private ObjectName buildObjectName(PropertyType type, String cluster, int 
num)
+      throws MalformedObjectNameException {
+    ObjectName objectName = buildObjectName(type, cluster);
+    if (num > 0) {
+      return new ObjectName(String
+          .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE,
+              String.valueOf(num)));
+    } else {
+      return objectName;
+    }
+  }
+
+  @Test
+  public void testMBeanRegisteration() throws JMException {
+    Set<RoutingTableProviderMonitor> monitors = new HashSet<>();
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, 
TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER)));
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, 
TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER, 1)));
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, 
TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER, 2)));
+    }
+
+    // Un-register all monitors
+    for (RoutingTableProviderMonitor monitor : monitors) {
+      monitor.unregister();
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER)));
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER, 1)));
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, 
TEST_CLUSTER, 2)));
+    }
+  }
+
+  @Test
+  public void testMetrics() throws JMException, InterruptedException {
+    PropertyType type = PropertyType.EXTERNALVIEW;
+    RoutingTableProviderMonitor monitor = new 
RoutingTableProviderMonitor(type, TEST_CLUSTER);
+    monitor.register();
+    ObjectName name = buildObjectName(type, TEST_CLUSTER);
+
+    monitor.increaseCallbackCounters(10);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"CallbackCounter"), 1);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"EventQueueSizeGauge"), 10);
+    monitor.increaseCallbackCounters(15);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"CallbackCounter"), 2);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"EventQueueSizeGauge"), 15);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshLatencyGauge.Max"), 0);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshCounter"), 0);
+
+    long startTime = System.currentTimeMillis();
+    Thread.sleep(5);
+    monitor.increaseDataRefreshCounters(startTime);
+    long latency = (long) _beanServer.getAttribute(name, 
"DataRefreshLatencyGauge.Max");
+    Assert.assertTrue(latency >= 5 && latency <= System.currentTimeMillis() - 
startTime);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshCounter"), 1);
+
+    monitor.increaseDataRefreshCounters(startTime);
+    long newLatency = (long) _beanServer.getAttribute(name, 
"DataRefreshLatencyGauge.Max");
+    Assert.assertTrue(newLatency >= latency);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, 
"DataRefreshCounter"), 2);
+
+    monitor.unregister();
+  }
+}

Reply via email to