Repository: helix Updated Branches: refs/heads/master 074667363 -> e1faf2404
[HELIX-697] Add cluster level metrics in ClusterStatusMonitor Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e1faf240 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e1faf240 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e1faf240 Branch: refs/heads/master Commit: e1faf2404c3bb74aab7c402d76246b41af74fd16 Parents: 0746673 Author: Hunter Lee <[email protected]> Authored: Thu Apr 19 13:33:54 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Thu Apr 19 13:34:11 2018 -0700 ---------------------------------------------------------------------- .../monitoring/mbeans/ClusterStatusMonitor.java | 47 +++- .../mbeans/ClusterStatusMonitorMBean.java | 59 +++-- .../monitoring/mbeans/ResourceMonitor.java | 120 +++++++---- .../dynamicMBeans/DynamicMBeanProvider.java | 4 +- .../mbeans/TestClusterAggregateMetrics.java | 214 +++++++++++++++++++ 5 files changed, 384 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index c7e0fdb..954ae7d 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -50,7 +50,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { static final String WORKFLOW_TYPE_DN_KEY = "workflowType"; static final String JOB_TYPE_DN_KEY = "jobType"; static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT"; - public static final String DEFAULT_TAG = "DEFAULT"; private final String _clusterName; @@ -69,9 +68,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { private boolean _rebalanceFailure = false; private AtomicLong _rebalanceFailureCount = new AtomicLong(0L); + // Aggregate metrics from ResourceMonitors + private volatile long _totalPartitionCount = 0; + private volatile long _totalErrorPartitionCount = 0; + private volatile long _totalPartitionsWithoutTopStateCount = 0; + private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0; private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>(); // phaseName -> eventMonitor @@ -457,7 +460,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { if (!_resourceMbeanMap.containsKey(resourceName)) { String beanName = getResourceBeanName(resourceName); ResourceMonitor bean = - new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName)); + new ResourceMonitor(this, _clusterName, resourceName, getObjectName(beanName)); bean.register(); _resourceMbeanMap.put(resourceName, bean); } @@ -777,7 +780,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { _inMaintenance = inMaintenance; } - @Override public long getPaused() { return _paused ? 1 : 0; @@ -799,4 +801,41 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { public long getRebalanceFailureCounter() { return _rebalanceFailureCount.get(); } + + @Override + public long getTotalPartitionCount() { + return _totalPartitionCount; + } + + @Override + public long getTotalErrorPartitionCount() { + return _totalErrorPartitionCount; + } + + @Override + public long getTotalPartitionsWithoutTopStateCount() { + return _totalPartitionsWithoutTopStateCount; + } + + @Override + public long getTotalExternalViewIdealStateMismatchPartitionCount() { + return _totalExternalViewIdealStateMismatchPartitionCount; + } + + synchronized void applyDeltaToTotalPartitionCount(long delta) { + _totalPartitionCount += delta; + } + + synchronized void applyDeltaToTotalErrorPartitionCount(long delta) { + _totalErrorPartitionCount += delta; + } + + synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long delta) { + _totalPartitionsWithoutTopStateCount += delta; + } + + synchronized void applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta) { + _totalExternalViewIdealStateMismatchPartitionCount += delta; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java index 49d316e..6ace495 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java @@ -22,49 +22,82 @@ package org.apache.helix.monitoring.mbeans; import org.apache.helix.monitoring.SensorNameProvider; public interface ClusterStatusMonitorMBean extends SensorNameProvider { - public long getDownInstanceGauge(); - public long getInstancesGauge(); + /** + * @return number of instances that are down (non-live instances) + */ + long getDownInstanceGauge(); + + /** + * @return total number of instances + */ + long getInstancesGauge(); - public long getDisabledInstancesGauge(); + /** + * @return number of disabled instances + */ + long getDisabledInstancesGauge(); - public long getDisabledPartitionsGauge(); + /** + * @return number of disabled partitions + */ + long getDisabledPartitionsGauge(); - public long getRebalanceFailureGauge(); + /** + * @return 1 if rebalance failed; 0 if rebalance did not fail + */ + long getRebalanceFailureGauge(); /** * The max message queue size across all instances including controller * @return */ - public long getMaxMessageQueueSizeGauge(); + long getMaxMessageQueueSizeGauge(); /** * The sum of all message queue sizes for instances in this cluster * @return */ - public long getInstanceMessageQueueBacklog(); + long getInstanceMessageQueueBacklog(); /** * @return 1 if cluster is enabled, otherwise 0 */ - public long getEnabled(); + long getEnabled(); /** - * * @return 1 if cluster is in maintenance mode, otherwise 0 */ - public long getMaintenance(); - + long getMaintenance(); /** - * * @return 1 if cluster is paused, otherwise 0 */ - public long getPaused(); + long getPaused(); /** * The number of failures during rebalance pipeline. * @return */ long getRebalanceFailureCounter(); + + /** + * @return number of all partitions in this cluster + */ + long getTotalPartitionCount(); + + /** + * @return number of all partitions in this cluster that have errors + */ + long getTotalErrorPartitionCount(); + + /** + * @return number of all partitions in this cluster without any top-state replicas + */ + long getTotalPartitionsWithoutTopStateCount(); + + /** + * @return number of all partitions in this cluster whose ExternalView and IdealState have discrepancies + */ + long getTotalExternalViewIdealStateMismatchPartitionCount(); } http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index 3d5c579..125257b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -38,17 +38,17 @@ import java.util.*; public class ResourceMonitor extends DynamicMBeanProvider { // Gauges - private SimpleDynamicMetric<Integer> _numOfPartitions; - private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView; - private SimpleDynamicMetric<Integer> _numOfErrorPartitions; - private SimpleDynamicMetric<Integer> _numNonTopStatePartitions; + private SimpleDynamicMetric<Long> _numOfPartitions; + private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView; + private SimpleDynamicMetric<Long> _numOfErrorPartitions; + private SimpleDynamicMetric<Long> _numNonTopStatePartitions; + private SimpleDynamicMetric<Long> _externalViewIdealStateDiff; private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions; private SimpleDynamicMetric<Long> _numLessReplicaPartitions; private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions; private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions; private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions; private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions; - private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff; // Counters private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter; @@ -63,6 +63,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { private final String _resourceName; private final String _clusterName; private final ObjectName _initObjectName; + private ClusterStatusMonitor _clusterStatusMonitor; @Override public ResourceMonitor register() throws JMException { @@ -85,16 +86,37 @@ public class ResourceMonitor extends DynamicMBeanProvider { attributeList.add(_partitionTopStateHandoffDurationGauge); attributeList.add(_totalMessageReceived); doRegister(attributeList, _initObjectName); - return this; } + @Override + public synchronized void unregister() { + super.unregister(); + // Also remove metrics propagated to aggregate metrics in ClusterStatusMonitor + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue()); + _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue()); + _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount( + -_externalViewIdealStateDiff.getValue()); + _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue()); + } + } + public enum MonitorState { TOP_STATE } - public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) - throws JMException { + public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) { + this(null, clusterName, resourceName, objectName); + } + + public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String clusterName, String resourceName, + ObjectName objectName) { + if (clusterStatusMonitor == null) { + _logger.warn("ResourceMonitor initialized without a reference to ClusterStatusMonitor (null): metrics will not " + + "be aggregated at the cluster level."); + } + _clusterStatusMonitor = clusterStatusMonitor; _clusterName = clusterName; _resourceName = resourceName; _initObjectName = objectName; @@ -130,9 +152,7 @@ public class ResourceMonitor extends DynamicMBeanProvider { @Override public String getSensorName() { - return String - .format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, - _resourceName); + return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, _resourceName); } public long getPartitionGauge() { @@ -199,30 +219,29 @@ public class ResourceMonitor extends DynamicMBeanProvider { } resetGauges(); + if (idealState == null) { - _logger.warn("ideal state is null for " + _resourceName); + _logger.warn("ideal state is null for {}", _resourceName); return; } assert (_resourceName.equals(idealState.getId())); assert (_resourceName.equals(externalView.getId())); - int numOfErrorPartitions = 0; - int numOfDiff = 0; - int numOfPartitionWithTopState = 0; + long numOfErrorPartitions = 0; + long numOfDiff = 0; + long numOfPartitionWithTopState = 0; Set<String> partitions = idealState.getPartitionSet(); - - _numOfPartitions.updateValue(partitions.size()); - int replica; try { replica = Integer.valueOf(idealState.getReplicas()); } catch (NumberFormatException e) { - _logger.info("Unspecified replica count for " + _resourceName + ", skip updating the ResourceMonitor Mbean: " + idealState.getReplicas()); + _logger.info("Unspecified replica count for {}, skip updating the ResourceMonitor Mbean: {}", _resourceName, + idealState.getReplicas()); return; } catch (Exception ex) { - _logger.warn("Failed to get replica count for " + _resourceName + ", cannot update the ResourceMonitor Mbean."); + _logger.warn("Failed to get replica count for {}, cannot update the ResourceMonitor Mbean.", _resourceName); return; } @@ -273,9 +292,23 @@ public class ResourceMonitor extends DynamicMBeanProvider { .updateValue(_numLessMinActiveReplicaPartitions.getValue() + 1); } } + + // Update cluster-level aggregate metrics in ClusterStatusMonitor + if (_clusterStatusMonitor != null) { + _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() - _numOfPartitions.getValue()); + _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount( + numOfErrorPartitions - _numOfErrorPartitions.getValue()); + _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount( + numOfDiff - _externalViewIdealStateDiff.getValue()); + _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount( + (partitions.size() - numOfPartitionWithTopState) - _numNonTopStatePartitions.getValue()); + } + + // Update resource-level metrics + _numOfPartitions.updateValue((long) partitions.size()); _numOfErrorPartitions.updateValue(numOfErrorPartitions); _externalViewIdealStateDiff.updateValue(numOfDiff); - _numOfPartitionsInExternalView.updateValue(externalView.getPartitionSet().size()); + _numOfPartitionsInExternalView.updateValue((long) externalView.getPartitionSet().size()); _numNonTopStatePartitions.updateValue(_numOfPartitions.getValue() - numOfPartitionWithTopState); String tag = idealState.getInstanceGroupTag(); @@ -287,10 +320,15 @@ public class ResourceMonitor extends DynamicMBeanProvider { } private void resetGauges() { - _numOfErrorPartitions.updateValue(0); - _numNonTopStatePartitions.updateValue(0); - _externalViewIdealStateDiff.updateValue(0); - _numOfPartitionsInExternalView.updateValue(0); + // Disable reset for the following gauges: + // 1) Need the previous values for these gauges to compute delta for cluster-level metrics. + // 2) These four gauges are reset every time updateResource is called anyway. + //_numOfErrorPartitions.updateValue(0l); + //_numNonTopStatePartitions.updateValue(0l); + //_externalViewIdealStateDiff.updateValue(0l); + //_numOfPartitionsInExternalView.updateValue(0l); + + // The following gauges are computed each call to updateResource by way of looping so need to be reset. _numLessMinActiveReplicaPartitions.updateValue(0l); _numLessReplicaPartitions.updateValue(0l); _numPendingRecoveryRebalancePartitions.updateValue(0l); @@ -301,23 +339,23 @@ public class ResourceMonitor extends DynamicMBeanProvider { public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) { switch (monitorState) { - case TOP_STATE: - if (succeeded) { - _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1); - _successfulTopStateHandoffDurationCounter - .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration); - _partitionTopStateHandoffDurationGauge.updateValue(duration); - if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) { - _maxSinglePartitionTopStateHandoffDuration.updateValue(duration); - _lastResetTime = System.currentTimeMillis(); + case TOP_STATE: + if (succeeded) { + _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1); + _successfulTopStateHandoffDurationCounter + .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration); + _partitionTopStateHandoffDurationGauge.updateValue(duration); + if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) { + _maxSinglePartitionTopStateHandoffDuration.updateValue(duration); + _lastResetTime = System.currentTimeMillis(); + } + } else { + _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1); } - } else { - _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1); - } - break; - default: - _logger.warn( - String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name())); + break; + default: + _logger.warn( + String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name())); } } http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java index b44c63c..988ba9b 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java @@ -134,7 +134,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr public abstract DynamicMBeanProvider register() throws JMException; /** - * After unregistered, the MBean can't be registered again, a new monitor has be to created. + * After unregistered, the MBean can't be registered again, a new monitor has to be created. */ public synchronized void unregister() { MBeanRegistrar.unregister(_objectName); @@ -176,7 +176,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr @Override public void setAttribute(Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, - ReflectionException { + ReflectionException { // All MBeans are readonly return; } http://git-wip-us.apache.org/repos/asf/helix/blob/e1faf240/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java new file mode 100644 index 0000000..ba3b654 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java @@ -0,0 +1,214 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.Query; +import javax.management.QueryExp; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * This test specifically tests MBean metrics instrumented in ClusterStatusMonitor that aggregate individual + * resource-level metrics into cluster-level figures. + * + * Sets up 3 Participants and 5 partitions with 3 replicas each, the test monitors the change in the numbers + * when a Participant is disabled. + * + */ +public class TestClusterAggregateMetrics extends ZkIntegrationTestBase { + + // Configurable values for test setup + private static final int NUM_PARTICIPANTS = 3; + private static final int NUM_PARTITIONS = 5; + private static final int NUM_REPLICAS = 3; + + private static final String PARTITION_COUNT = "TotalPartitionCount"; + private static final String ERROR_PARTITION_COUNT = "TotalErrorPartitionCount"; + private static final String WITHOUT_TOPSTATE_COUNT = "TotalPartitionsWithoutTopStateCount"; + private static final String IS_EV_MISMATCH_COUNT = "TotalExternalViewIdealStateMismatchPartitionCount"; + + private static final int START_PORT = 12918; + private static final String STATE_MODEL = "MasterSlave"; + private static final String TEST_DB = "TestDB"; + private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer(); + private final String CLASS_NAME = getShortClassName(); + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private ClusterSetup _setupTool; + private HelixManager _manager; + private MockParticipantManager[] _participants = new MockParticipantManager[NUM_PARTICIPANTS]; + private ClusterControllerManager _controller; + private Map<String, Object> _beanValueMap = new HashMap<>(); + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + _setupTool = new ClusterSetup(ZK_ADDR); + + // setup storage cluster + _setupTool.addCluster(CLUSTER_NAME, true); + _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, STATE_MODEL); + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, NUM_REPLICAS); + + // start dummy participants + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + boolean result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + result = ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + // create cluster manager + _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", + InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + } + + /** + * Shutdown order: 1) disconnect the controller 2) disconnect participants. + * + */ + @AfterClass + public void afterClass() { + if (_controller != null && _controller.isConnected()) { + _controller.syncStop(); + } + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + if (_participants[i] != null && _participants[i].isConnected()) { + _participants[i].syncStop(); + } + } + if (_manager != null && _manager.isConnected()) { + _manager.disconnect(); + } + + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testAggregateMetrics() throws InterruptedException { + // Everything should be up and running initially with 5 total partitions + updateMetrics(); + Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L); + Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L); + + // Disable all Participants (instances) + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false); + } + Thread.sleep(500); + updateMetrics(); + Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L); + Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 5L); + Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 5L); + + // Re-enable all Participants (instances) + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, true); + } + Thread.sleep(500); + updateMetrics(); + Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L); + Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L); + + // Drop the resource and check that all metrics are zero. + _setupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_DB); + Thread.sleep(500); + updateMetrics(); + Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(ERROR_PARTITION_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(WITHOUT_TOPSTATE_COUNT), 0L); + Assert.assertEquals(_beanValueMap.get(IS_EV_MISMATCH_COUNT), 0L); + } + + /** + * Queries for all MBeans from the MBean Server and only looks at the relevant MBean and gets its metric numbers. + * + */ + private void updateMetrics() { + try { + QueryExp exp = Query.match(Query.attr("SensorName"), Query.value("*" + CLUSTER_NAME + "*")); + Set<ObjectInstance> mbeans = + new HashSet<>(ManagementFactory.getPlatformMBeanServer().queryMBeans(new ObjectName("ClusterStatus:*"), exp)); + for (ObjectInstance instance : mbeans) { + ObjectName beanName = instance.getObjectName(); + if (beanName.toString().equals("ClusterStatus:cluster=" + CLUSTER_NAME)) { + MBeanInfo info = _server.getMBeanInfo(beanName); + MBeanAttributeInfo[] infos = info.getAttributes(); + for (MBeanAttributeInfo infoItem : infos) { + Object val = _server.getAttribute(beanName, infoItem.getName()); + _beanValueMap.put(infoItem.getName(), val); + } + } + } + } catch (Exception e) { + // update failed + } + } +}
