This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 5d7a610 Fix MaxCapacityUsageGauge value not updated (#1464)
5d7a610 is described below
commit 5d7a610cdd9b733af8b38158cdfe04d562081713
Author: Huizhi Lu <[email protected]>
AuthorDate: Tue Oct 13 11:21:53 2020 -0700
Fix MaxCapacityUsageGauge value not updated (#1464)
MaxCapacityUsageGauge value not updated because the resourcesToMonitor map
is empty.
This commit fixes the bug and also adds an integration test to protect the
metrics reporting logic.
---
.../rebalancer/util/WagedValidationUtil.java | 14 ++++
.../rebalancer/waged/WagedRebalancer.java | 10 +--
.../stages/BestPossibleStateCalcStage.java | 12 ++--
.../stages/CurrentStateComputationStage.java | 5 +-
.../waged/TestWagedRebalancerMetrics.java | 84 ++++++++++++++++++++++
.../waged/model/AbstractTestClusterModel.java | 4 ++
6 files changed, 115 insertions(+), 14 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
index 9742cb1..ac62ed5 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
@@ -88,4 +90,16 @@ public class WagedValidationUtil {
}
return partitionCapacity;
}
+
+ /**
+ * Checks whether or not a resource has enabled WAGED rebalancer.
+ *
+ * @param idealState {@code IdealState} of the resource being checked.
+ * @return {@code true} if WAGED is enabled; otherwise, {@code false}.
+ */
+ public static boolean isWagedEnabled(IdealState idealState) {
+ return idealState != null
+ &&
idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+ &&
WagedRebalancer.class.getName().equals(idealState.getRebalancerClassName());
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 89ecc47..4451c1d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -44,6 +44,7 @@ import
org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.StatefulRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import
org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -598,11 +599,10 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
private void validateInput(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap) throws HelixRebalanceException {
- Set<String> nonCompatibleResources =
resourceMap.entrySet().stream().filter(resourceEntry -> {
- IdealState is = clusterData.getIdealState(resourceEntry.getKey());
- return is == null ||
!is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
- ||
!WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
- }).map(Map.Entry::getKey).collect(Collectors.toSet());
+ Set<String> nonCompatibleResources = resourceMap.keySet().stream()
+ .filter(resource ->
!WagedValidationUtil.isWagedEnabled(clusterData.getIdealState(resource)))
+ .collect(Collectors.toSet());
+
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
"Input contains invalid resource(s) that cannot be rebalanced by the
WAGED rebalancer. %s",
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 029091c..7b346ec 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -41,6 +41,7 @@ import
org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -253,13 +254,10 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
}
// Find the compatible resources: 1. FULL_AUTO 2. Configured to use the
WAGED rebalancer
- Map<String, Resource> wagedRebalancedResourceMap =
- resourceMap.entrySet().stream().filter(resourceEntry -> {
- IdealState is = cache.getIdealState(resourceEntry.getKey());
- return is != null &&
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
- &&
WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
- }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
- resourceEntry -> resourceEntry.getValue()));
+ Map<String, Resource> wagedRebalancedResourceMap =
resourceMap.entrySet().stream()
+ .filter(resourceEntry ->
+
WagedValidationUtil.isWagedEnabled(cache.getIdealState(resourceEntry.getKey())))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, IdealState> newIdealStates = new HashMap<>();
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6c9f245..4af7e27 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -34,6 +34,7 @@ import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -283,10 +284,10 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
try {
// ResourceToRebalance map also has resources from current states.
- // Only use the resources in ideal states to parse all replicas.
+ // Only use the resources in ideal states that enable WAGED to parse
all replicas.
Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
Map<String, Resource> resourceToMonitorMap =
resourceMap.entrySet().stream()
- .filter(idealStateMap::containsKey)
+ .filter(entry ->
WagedValidationUtil.isWagedEnabled(idealStateMap.get(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, ResourceAssignment> currentStateAssignment =
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index b02c30b..bdc677b 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,19 +29,31 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import javax.management.AttributeNotFoundException;
import javax.management.JMException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.TestHelper;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.Pipeline;
import
org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import
org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.mock.MockManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.InstanceMonitor;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
@@ -144,6 +157,77 @@ public class TestWagedRebalancerMetrics extends
AbstractTestClusterModel {
RatioMetric.class).getLastEmittedMetricValue() == 0.0d,
TestHelper.WAIT_DURATION));
}
+ /*
+ * Integration test for WAGED instance capacity metrics.
+ */
+ @Test
+ public void testInstanceCapacityMetrics() throws Exception {
+ final String clusterName = TestHelper.getTestMethodName();
+ final ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+
+ ResourceControllerDataProvider cache = setupClusterDataCache();
+ Map<String, Resource> resourceMap =
cache.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+
+ event.addAttribute(AttributeName.helixmanager.name(), new MockManager());
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+ event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
+
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ReadClusterDataStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ rebalancePipeline.handle(event);
+
+ final MBeanServerConnection mBeanServer =
ManagementFactory.getPlatformMBeanServer();
+
+ for (String instance : _instances) {
+ String instanceBeanName = String.format("%s=%s,instanceName=%s",
+ ClusterStatusMonitor.CLUSTER_DN_KEY, clusterName, instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+ Assert.assertTrue(TestHelper
+ .verify(() -> mBeanServer.isRegistered(instanceObjectName),
+ TestHelper.WAIT_DURATION));
+
+ // Verify capacity gauge metrics
+ for (Map.Entry<String, Integer> capacityEntry :
_capacityDataMap.entrySet()) {
+ String capacityKey = capacityEntry.getKey();
+ String attributeName = capacityKey + "Gauge";
+ Assert.assertTrue(TestHelper.verify(() -> {
+ try {
+ return (long) mBeanServer.getAttribute(instanceObjectName,
attributeName)
+ == _capacityDataMap.get(capacityKey);
+ } catch (AttributeNotFoundException e) {
+ return false;
+ }
+ }, TestHelper.WAIT_DURATION), "Instance capacity gauge metric is not
found or incorrect!");
+ Assert.assertEquals((long)
mBeanServer.getAttribute(instanceObjectName, attributeName),
+ (long) _capacityDataMap.get(capacityKey));
+ }
+
+ // Verify MaxCapacityUsageGauge
+ Assert.assertTrue(TestHelper.verify(() -> {
+ try {
+ double actualMaxUsage = (double)
mBeanServer.getAttribute(instanceObjectName,
+
InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName());
+ // The values are manually calculated from the capacity configs, to
make the code simple.
+ double expectedMaxUsage = instance.equals(_testInstanceId) ? 0.4 :
0.0;
+
+ return Math.abs(actualMaxUsage - expectedMaxUsage) < 0.000001d;
+ } catch (AttributeNotFoundException e) {
+ return false;
+ }
+ }, TestHelper.WAIT_DURATION), "MaxCapacityUsageGauge is not found or
incorrect");
+ }
+ }
+
@Override
protected ResourceControllerDataProvider setupClusterDataCache() throws
IOException {
ResourceControllerDataProvider testCache = super.setupClusterDataCache();
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 6d8b861..8887e87 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -137,6 +137,8 @@ public abstract class AbstractTestClusterModel {
when(testCurrentStateResource1.getStateModelDefRef()).thenReturn("MasterSlave");
when(testCurrentStateResource1.getState(_partitionNames.get(0))).thenReturn("MASTER");
when(testCurrentStateResource1.getState(_partitionNames.get(1))).thenReturn("SLAVE");
+ when(testCurrentStateResource1.getSessionId()).thenReturn(_sessionId);
+
CurrentState testCurrentStateResource2 = Mockito.mock(CurrentState.class);
Map<String, String> partitionStateMap2 = new HashMap<>();
partitionStateMap2.put(_partitionNames.get(2), "MASTER");
@@ -146,6 +148,8 @@ public abstract class AbstractTestClusterModel {
when(testCurrentStateResource2.getStateModelDefRef()).thenReturn("MasterSlave");
when(testCurrentStateResource2.getState(_partitionNames.get(2))).thenReturn("MASTER");
when(testCurrentStateResource2.getState(_partitionNames.get(3))).thenReturn("SLAVE");
+ when(testCurrentStateResource2.getSessionId()).thenReturn(_sessionId);
+
Map<String, CurrentState> currentStatemap = new HashMap<>();
currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);