Repository: helix Updated Branches: refs/heads/master e1ca65193 -> 8a6ac8ff2
[HELIX-714] [HaaS] Fix aggregate metrics in ClusterStatusMonitor Names of the metrics have been fixed per Helix's convention and loops are now used instead of using delta values. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a6ac8ff Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a6ac8ff Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a6ac8ff Branch: refs/heads/master Commit: 8a6ac8ff278aa9b4ad8445700266af820d0d62cc Parents: e1ca651 Author: Hunter Lee <[email protected]> Authored: Mon Jul 9 12:24:46 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Mon Jul 9 13:52:19 2018 -0700 ---------------------------------------------------------------------- .../monitoring/mbeans/ClusterStatusMonitor.java | 59 +++++------ .../mbeans/ClusterStatusMonitorMBean.java | 8 +- .../monitoring/mbeans/ResourceMonitor.java | 101 ++++++------------- .../mbeans/TestClusterAggregateMetrics.java | 16 ++- 4 files changed, 69 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 954ae7d..b2c5fb0 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -68,12 +68,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private boolean _rebalanceFailure = false; private AtomicLong _rebalanceFailureCount = new AtomicLong(0L); - // Aggregate metrics from ResourceMonitors - private volatile long _totalPartitionCount = 0; - private volatile long _totalErrorPartitionCount = 0; - private volatile long _totalPartitionsWithoutTopStateCount = 0; - private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0; - private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>(); @@ -460,7 +454,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { if (!_resourceMbeanMap.containsKey(resourceName)) { String beanName = getResourceBeanName(resourceName); ResourceMonitor bean = - new ResourceMonitor(this, _clusterName, resourceName, getObjectName(beanName)); + new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName)); bean.register(); _resourceMbeanMap.put(resourceName, bean); } @@ -803,39 +797,38 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } @Override - public long getTotalPartitionCount() { - return _totalPartitionCount; + public long getTotalPartitionGauge() { + long total = 0; + for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) { + total += entry.getValue().getPartitionGauge(); + } + return total; } @Override - public long getTotalErrorPartitionCount() { - return _totalErrorPartitionCount; + public long getErrorPartitionGauge() { + long total = 0; + for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) { + total += entry.getValue().getErrorPartitionGauge(); + } + return total; } @Override - public long getTotalPartitionsWithoutTopStateCount() { - return _totalPartitionsWithoutTopStateCount; + public long getMissingTopStatePartitionGauge() { + long total = 0; + for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) { + total += entry.getValue().getMissingTopStatePartitionGauge(); + } + return total; } @Override - public long getTotalExternalViewIdealStateMismatchPartitionCount() { - return _totalExternalViewIdealStateMismatchPartitionCount; - } - - synchronized void applyDeltaToTotalPartitionCount(long delta) { - _totalPartitionCount += delta; - } - - synchronized void applyDeltaToTotalErrorPartitionCount(long delta) { - _totalErrorPartitionCount += delta; - } - - synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long delta) { - _totalPartitionsWithoutTopStateCount += delta; - } - - synchronized void applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta) { - _totalExternalViewIdealStateMismatchPartitionCount += delta; + public long getDifferenceWithIdealStateGauge() { + long total = 0; + for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) { + total += entry.getValue().getDifferenceWithIdealStateGauge(); + } + return total; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java index 6ace495..81600cb 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java @@ -84,20 +84,20 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider { /** * @return number of all partitions in this cluster */ - long getTotalPartitionCount(); + long getTotalPartitionGauge(); /** * @return number of all partitions in this cluster that have errors */ - long getTotalErrorPartitionCount(); + long getErrorPartitionGauge(); /** * @return number of all partitions in this cluster without any top-state replicas */ - long getTotalPartitionsWithoutTopStateCount(); + long getMissingTopStatePartitionGauge(); /** * @return number of all partitions in this cluster whose ExternalView and IdealState have discrepancies */ - long getTotalExternalViewIdealStateMismatchPartitionCount(); + long getDifferenceWithIdealStateGauge(); } http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index 125257b..d4b46b1 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -55,15 +55,16 @@ public class ResourceMonitor extends DynamicMBeanProvider { private SimpleDynamicMetric<Long> _successTopStateHandoffCounter; private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter; private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration; - private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge; private SimpleDynamicMetric<Long> _totalMessageReceived; + // Histograms + private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge; + private String _tag = ClusterStatusMonitor.DEFAULT_TAG; private long _lastResetTime; private final String _resourceName; private final String _clusterName; private final ObjectName _initObjectName; - private ClusterStatusMonitor _clusterStatusMonitor; @Override public ResourceMonitor register() throws JMException { @@ -89,70 +90,48 @@ public class ResourceMonitor extends DynamicMBeanProvider { return this; } - @Override - public synchronized void unregister() { - super.unregister(); - // Also remove metrics propagated to aggregate metrics in ClusterStatusMonitor - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue()); - _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue()); - _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount( - -_externalViewIdealStateDiff.getValue()); - _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue()); - } - } - public enum MonitorState { TOP_STATE } public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) { - this(null, clusterName, resourceName, objectName); - } - - public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String clusterName, String resourceName, - ObjectName objectName) { - if (clusterStatusMonitor == null) { - _logger.warn("ResourceMonitor initialized without a reference to ClusterStatusMonitor (null): metrics will not " - + "be aggregated at the cluster level."); - } - _clusterStatusMonitor = clusterStatusMonitor; _clusterName = clusterName; _resourceName = resourceName; _initObjectName = objectName; - _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l); + _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L); _numLoadRebalanceThrottledPartitions = - new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l); + new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L); _numRecoveryRebalanceThrottledPartitions = - new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l); + new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0L); _numPendingLoadRebalancePartitions = - new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l); + new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L); _numPendingRecoveryRebalancePartitions = - new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l); - _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l); + new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L); + _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L); _numLessMinActiveReplicaPartitions = - new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l); - _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l); - _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l); - _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l); - _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l); + new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L); + _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0L); + _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0L); + _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L); + _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L); _partitionTopStateHandoffDurationGauge = new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram( new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS))); - _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l); + _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L); _maxSinglePartitionTopStateHandoffDuration = - new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l); - _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l); - _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l); + new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L); + _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0L); + _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L); _successfulTopStateHandoffDurationCounter = - new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l); + new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L); } @Override public String getSensorName() { - return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, _resourceName); + return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, + _tag, _resourceName); } public long getPartitionGauge() { @@ -293,17 +272,6 @@ public class ResourceMonitor extends DynamicMBeanProvider { } } - // Update cluster-level aggregate metrics in ClusterStatusMonitor - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() - _numOfPartitions.getValue()); - _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount( - numOfErrorPartitions - _numOfErrorPartitions.getValue()); - _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount( - numOfDiff - _externalViewIdealStateDiff.getValue()); - _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount( - (partitions.size() - numOfPartitionWithTopState) - _numNonTopStatePartitions.getValue()); - } - // Update resource-level metrics _numOfPartitions.updateValue((long) partitions.size()); _numOfErrorPartitions.updateValue(numOfErrorPartitions); @@ -320,21 +288,18 @@ public class ResourceMonitor extends DynamicMBeanProvider { } private void resetGauges() { - // Disable reset for the following gauges: - // 1) Need the previous values for these gauges to compute delta for cluster-level metrics. - // 2) These four gauges are reset every time updateResource is called anyway. - //_numOfErrorPartitions.updateValue(0l); - //_numNonTopStatePartitions.updateValue(0l); - //_externalViewIdealStateDiff.updateValue(0l); - //_numOfPartitionsInExternalView.updateValue(0l); + _numOfErrorPartitions.updateValue(0L); + _numNonTopStatePartitions.updateValue(0L); + _externalViewIdealStateDiff.updateValue(0L); + _numOfPartitionsInExternalView.updateValue(0L); // The following gauges are computed each call to updateResource by way of looping so need to be reset. - _numLessMinActiveReplicaPartitions.updateValue(0l); - _numLessReplicaPartitions.updateValue(0l); - _numPendingRecoveryRebalancePartitions.updateValue(0l); - _numPendingLoadRebalancePartitions.updateValue(0l); - _numRecoveryRebalanceThrottledPartitions.updateValue(0l); - _numLoadRebalanceThrottledPartitions.updateValue(0l); + _numLessMinActiveReplicaPartitions.updateValue(0L); + _numLessReplicaPartitions.updateValue(0L); + _numPendingRecoveryRebalancePartitions.updateValue(0L); + _numPendingLoadRebalancePartitions.updateValue(0L); + _numRecoveryRebalanceThrottledPartitions.updateValue(0L); + _numLoadRebalanceThrottledPartitions.updateValue(0L); } public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) { @@ -398,8 +363,8 @@ public class ResourceMonitor extends DynamicMBeanProvider { public void resetMaxTopStateHandoffGauge() { if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) { - _maxSinglePartitionTopStateHandoffDuration.updateValue(0l); + _maxSinglePartitionTopStateHandoffDuration.updateValue(0L); _lastResetTime = System.currentTimeMillis(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java index ba3b654..dfe5016 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java @@ -61,10 +61,10 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase { private static final int NUM_PARTITIONS = 5; private static final int NUM_REPLICAS = 3; - private static final String PARTITION_COUNT = "TotalPartitionCount"; - private static final String ERROR_PARTITION_COUNT = "TotalErrorPartitionCount"; - private static final String WITHOUT_TOPSTATE_COUNT = "TotalPartitionsWithoutTopStateCount"; - private static final String IS_EV_MISMATCH_COUNT = "TotalExternalViewIdealStateMismatchPartitionCount"; + private static final String PARTITION_COUNT = "TotalPartitionGauge"; + private static final String ERROR_PARTITION_COUNT = "ErrorPartitionGauge"; + private static final String WITHOUT_TOPSTATE_COUNT = "MissingTopStatePartitionGauge"; + private static final String IS_EV_MISMATCH_COUNT = "DifferenceWithIdealStateGauge"; private static final int START_PORT = 12918; private static final String STATE_MODEL = "MasterSlave"; @@ -82,15 +82,11 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase { public void beforeClass() throws Exception { System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } _setupTool = new ClusterSetup(ZK_ADDR); - // setup storage cluster _setupTool.addCluster(CLUSTER_NAME, true); _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, STATE_MODEL); + for (int i = 0; i < NUM_PARTICIPANTS; i++) { String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); @@ -145,7 +141,7 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase { } @Test - public void testAggregateMetrics() throws InterruptedException { + public void testAggregateMetrics() throws Exception { // Everything should be up and running initially with 5 total partitions updateMetrics(); Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);
