Repository: helix
Updated Branches:
  refs/heads/master e1ca65193 -> 8a6ac8ff2


[HELIX-714] [HaaS] Fix aggregate metrics in ClusterStatusMonitor

Names of the metrics have been fixed per Helix's convention and loops are now 
used instead of using delta values.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a6ac8ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a6ac8ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a6ac8ff

Branch: refs/heads/master
Commit: 8a6ac8ff278aa9b4ad8445700266af820d0d62cc
Parents: e1ca651
Author: Hunter Lee <[email protected]>
Authored: Mon Jul 9 12:24:46 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Mon Jul 9 13:52:19 2018 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java |  59 +++++------
 .../mbeans/ClusterStatusMonitorMBean.java       |   8 +-
 .../monitoring/mbeans/ResourceMonitor.java      | 101 ++++++-------------
 .../mbeans/TestClusterAggregateMetrics.java     |  16 ++-
 4 files changed, 69 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 954ae7d..b2c5fb0 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -68,12 +68,6 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
-  // Aggregate metrics from ResourceMonitors
-  private volatile long _totalPartitionCount = 0;
-  private volatile long _totalErrorPartitionCount = 0;
-  private volatile long _totalPartitionsWithoutTopStateCount = 0;
-  private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0;
-
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = 
new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = 
new ConcurrentHashMap<>();
 
@@ -460,7 +454,7 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
-                new ResourceMonitor(this, _clusterName, resourceName, 
getObjectName(beanName));
+                new ResourceMonitor(_clusterName, resourceName, 
getObjectName(beanName));
             bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
@@ -803,39 +797,38 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   @Override
-  public long getTotalPartitionCount() {
-    return _totalPartitionCount;
+  public long getTotalPartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : 
_resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getPartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalErrorPartitionCount() {
-    return _totalErrorPartitionCount;
+  public long getErrorPartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : 
_resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getErrorPartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalPartitionsWithoutTopStateCount() {
-    return _totalPartitionsWithoutTopStateCount;
+  public long getMissingTopStatePartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : 
_resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getMissingTopStatePartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalExternalViewIdealStateMismatchPartitionCount() {
-    return _totalExternalViewIdealStateMismatchPartitionCount;
-  }
-
-  synchronized void applyDeltaToTotalPartitionCount(long delta) {
-    _totalPartitionCount += delta;
-  }
-
-  synchronized void applyDeltaToTotalErrorPartitionCount(long delta) {
-    _totalErrorPartitionCount += delta;
-  }
-
-  synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long 
delta) {
-    _totalPartitionsWithoutTopStateCount += delta;
-  }
-
-  synchronized void 
applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta) {
-    _totalExternalViewIdealStateMismatchPartitionCount += delta;
+  public long getDifferenceWithIdealStateGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : 
_resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getDifferenceWithIdealStateGauge();
+    }
+    return total;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 6ace495..81600cb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -84,20 +84,20 @@ public interface ClusterStatusMonitorMBean extends 
SensorNameProvider {
   /**
    * @return number of all partitions in this cluster
    */
-  long getTotalPartitionCount();
+  long getTotalPartitionGauge();
 
   /**
    * @return number of all partitions in this cluster that have errors
    */
-  long getTotalErrorPartitionCount();
+  long getErrorPartitionGauge();
 
   /**
    * @return number of all partitions in this cluster without any top-state 
replicas
    */
-  long getTotalPartitionsWithoutTopStateCount();
+  long getMissingTopStatePartitionGauge();
 
   /**
    * @return number of all partitions in this cluster whose ExternalView and 
IdealState have discrepancies
    */
-  long getTotalExternalViewIdealStateMismatchPartitionCount();
+  long getDifferenceWithIdealStateGauge();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 125257b..d4b46b1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -55,15 +55,16 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _successTopStateHandoffCounter;
   private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter;
   private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration;
-  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
   private SimpleDynamicMetric<Long> _totalMessageReceived;
 
+  // Histograms
+  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
   private final String _resourceName;
   private final String _clusterName;
   private final ObjectName _initObjectName;
-  private ClusterStatusMonitor _clusterStatusMonitor;
 
   @Override
   public ResourceMonitor register() throws JMException {
@@ -89,70 +90,48 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return this;
   }
 
-  @Override
-  public synchronized void unregister() {
-    super.unregister();
-    // Also remove metrics propagated to aggregate metrics in 
ClusterStatusMonitor
-    if (_clusterStatusMonitor != null) {
-      
_clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue());
-      
_clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue());
-      
_clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
-          -_externalViewIdealStateDiff.getValue());
-      
_clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue());
-    }
-  }
-
   public enum MonitorState {
     TOP_STATE
   }
 
   public ResourceMonitor(String clusterName, String resourceName, ObjectName 
objectName) {
-    this(null, clusterName, resourceName, objectName);
-  }
-
-  public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String 
clusterName, String resourceName,
-      ObjectName objectName) {
-    if (clusterStatusMonitor == null) {
-      _logger.warn("ResourceMonitor initialized without a reference to 
ClusterStatusMonitor (null): metrics will not "
-          + "be aggregated at the cluster level.");
-    }
-    _clusterStatusMonitor = clusterStatusMonitor;
     _clusterName = clusterName;
     _resourceName = resourceName;
     _initObjectName = objectName;
 
-    _externalViewIdealStateDiff = new 
SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l);
+    _externalViewIdealStateDiff = new 
SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
     _numLoadRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L);
     _numRecoveryRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 
0l);
+        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 
0L);
     _numPendingLoadRebalancePartitions =
-        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L);
     _numPendingRecoveryRebalancePartitions =
-        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
-    _numLessReplicaPartitions = new 
SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L);
+    _numLessReplicaPartitions = new 
SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L);
     _numLessMinActiveReplicaPartitions =
-        new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
-    _numNonTopStatePartitions = new 
SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l);
-    _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l);
-    _numOfPartitionsInExternalView = new 
SimpleDynamicMetric("ExternalViewPartitionGauge", 0l);
-    _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l);
+        new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L);
+    _numNonTopStatePartitions = new 
SimpleDynamicMetric("MissingTopStatePartitionGauge", 0L);
+    _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0L);
+    _numOfPartitionsInExternalView = new 
SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
+    _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
 
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", 
new Histogram(
             new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, 
TimeUnit.MILLISECONDS)));
-    _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 
0l);
+    _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 
0L);
     _maxSinglePartitionTopStateHandoffDuration =
-        new 
SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
-    _failedTopStateHandoffCounter = new 
SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
-    _successTopStateHandoffCounter = new 
SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+        new 
SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
+    _failedTopStateHandoffCounter = new 
SimpleDynamicMetric("FailedTopStateHandoffCounter", 0L);
+    _successTopStateHandoffCounter = new 
SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
     _successfulTopStateHandoffDurationCounter =
-        new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 
0l);
+        new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 
0L);
   }
 
   @Override
   public String getSensorName() {
-    return String.format("%s.%s.%s.%s", 
ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, _resourceName);
+    return String.format("%s.%s.%s.%s", 
ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+        _tag, _resourceName);
   }
 
   public long getPartitionGauge() {
@@ -293,17 +272,6 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       }
     }
 
-    // Update cluster-level aggregate metrics in ClusterStatusMonitor
-    if (_clusterStatusMonitor != null) {
-      _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() 
- _numOfPartitions.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(
-          numOfErrorPartitions - _numOfErrorPartitions.getValue());
-      
_clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
-          numOfDiff - _externalViewIdealStateDiff.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(
-          (partitions.size() - numOfPartitionWithTopState) - 
_numNonTopStatePartitions.getValue());
-    }
-
     // Update resource-level metrics
     _numOfPartitions.updateValue((long) partitions.size());
     _numOfErrorPartitions.updateValue(numOfErrorPartitions);
@@ -320,21 +288,18 @@ public class ResourceMonitor extends DynamicMBeanProvider 
{
   }
 
   private void resetGauges() {
-    // Disable reset for the following gauges:
-    // 1) Need the previous values for these gauges to compute delta for 
cluster-level metrics.
-    // 2) These four gauges are reset every time updateResource is called 
anyway.
-    //_numOfErrorPartitions.updateValue(0l);
-    //_numNonTopStatePartitions.updateValue(0l);
-    //_externalViewIdealStateDiff.updateValue(0l);
-    //_numOfPartitionsInExternalView.updateValue(0l);
+    _numOfErrorPartitions.updateValue(0L);
+    _numNonTopStatePartitions.updateValue(0L);
+    _externalViewIdealStateDiff.updateValue(0L);
+    _numOfPartitionsInExternalView.updateValue(0L);
 
     // The following gauges are computed each call to updateResource by way of 
looping so need to be reset.
-    _numLessMinActiveReplicaPartitions.updateValue(0l);
-    _numLessReplicaPartitions.updateValue(0l);
-    _numPendingRecoveryRebalancePartitions.updateValue(0l);
-    _numPendingLoadRebalancePartitions.updateValue(0l);
-    _numRecoveryRebalanceThrottledPartitions.updateValue(0l);
-    _numLoadRebalanceThrottledPartitions.updateValue(0l);
+    _numLessMinActiveReplicaPartitions.updateValue(0L);
+    _numLessReplicaPartitions.updateValue(0L);
+    _numPendingRecoveryRebalancePartitions.updateValue(0L);
+    _numPendingLoadRebalancePartitions.updateValue(0L);
+    _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
+    _numLoadRebalanceThrottledPartitions.updateValue(0L);
   }
 
   public void updateStateHandoffStats(MonitorState monitorState, long 
duration, boolean succeeded) {
@@ -398,8 +363,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= 
System.currentTimeMillis()) {
-      _maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
+      _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);
       _lastResetTime = System.currentTimeMillis();
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
index ba3b654..dfe5016 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
@@ -61,10 +61,10 @@ public class TestClusterAggregateMetrics extends 
ZkIntegrationTestBase {
   private static final int NUM_PARTITIONS = 5;
   private static final int NUM_REPLICAS = 3;
 
-  private static final String PARTITION_COUNT = "TotalPartitionCount";
-  private static final String ERROR_PARTITION_COUNT = 
"TotalErrorPartitionCount";
-  private static final String WITHOUT_TOPSTATE_COUNT = 
"TotalPartitionsWithoutTopStateCount";
-  private static final String IS_EV_MISMATCH_COUNT = 
"TotalExternalViewIdealStateMismatchPartitionCount";
+  private static final String PARTITION_COUNT = "TotalPartitionGauge";
+  private static final String ERROR_PARTITION_COUNT = "ErrorPartitionGauge";
+  private static final String WITHOUT_TOPSTATE_COUNT = 
"MissingTopStatePartitionGauge";
+  private static final String IS_EV_MISMATCH_COUNT = 
"DifferenceWithIdealStateGauge";
 
   private static final int START_PORT = 12918;
   private static final String STATE_MODEL = "MasterSlave";
@@ -82,15 +82,11 @@ public class TestClusterAggregateMetrics extends 
ZkIntegrationTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
     _setupTool = new ClusterSetup(ZK_ADDR);
-
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
     _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, 
STATE_MODEL);
+
     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
@@ -145,7 +141,7 @@ public class TestClusterAggregateMetrics extends 
ZkIntegrationTestBase {
   }
 
   @Test
-  public void testAggregateMetrics() throws InterruptedException {
+  public void testAggregateMetrics() throws Exception {
     // Everything should be up and running initially with 5 total partitions
     updateMetrics();
     Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);

Reply via email to