This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new e0c551d1a Fix waged instance capacity npe on new resource (#2969)
e0c551d1a is described below
commit e0c551d1ab811654f0474643c12ea78c9e90f7f4
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Jan 29 11:15:14 2025 -0800
Fix waged instance capacity npe on new resource (#2969)
Fix waged instance capacity npe on new resource by clearing the WAGED
capacity map whenever there are no WAGED resources in the cluster. This will
prevent a stale map from being used once a new resource is added.
---
.../ResourceControllerDataProvider.java | 8 ++
.../stages/CurrentStateComputationStage.java | 7 ++
.../org/apache/helix/integration/TestWagedNPE.java | 111 +++++++++++++++++++++
3 files changed, 126 insertions(+)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 021aab6a8..b006a4f1e 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -515,6 +515,14 @@ public class ResourceControllerDataProvider extends
BaseControllerDataProvider {
_wagedPartitionWeightProvider = resourceWeightProvider;
}
+ /**
+ * Clears the WAGED algorithm specific instance capacity provider and
resource weight provider.
+ */
+ public void clearWagedCapacityProviders() {
+ _wagedInstanceCapacity = null;
+ _wagedPartitionWeightProvider = null;
+ }
+
/**
* Check and reduce the capacity of an instance for a resource partition
* @param instance - the instance to check
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index da972d682..2814e4062 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -356,6 +356,11 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
CurrentStateOutput currentStateOutput) {
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES.name());
if (skipCapacityCalculation(cache, resourceMap, event)) {
+ // Ensure instance capacity is null if there are no resources. This
prevents using a stale map when all resources
+ // are removed and then a new resource is added.
+ if (resourceMap == null || resourceMap.isEmpty()) {
+ cache.clearWagedCapacityProviders();
+ }
return;
}
@@ -364,7 +369,9 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
.filter(entry ->
WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ // Ensure instance capacity is null if there are no WAGED enabled instances
if (wagedEnabledResourceMap.isEmpty()) {
+ cache.clearWagedCapacityProviders();
return;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java
b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java
new file mode 100644
index 000000000..57930fa71
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestWagedNPE.java
@@ -0,0 +1,111 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestWagedNPE extends ZkTestBase {
+
+ public static String CLUSTER_NAME = TestHelper.getTestClassName() +
"_cluster";
+ public static int PARTICIPANT_COUNT = 3;
+ public static List<MockParticipantManager> _participants = new ArrayList<>();
+ public static ClusterControllerManager _controller;
+ public static ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() {
+ System.out.println("Start test " + TestHelper.getTestClassName());
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < PARTICIPANT_COUNT; i++) {
+ addParticipant("localhost_" + i);
+ }
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ String testCapacityKey = "TestCapacityKey";
+
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey,
100));
+
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey,
1));
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ }
+
+ // This test was constructed to capture the bug described in issue 2891
+ // https://github.com/apache/helix/issues/2891
+ @Test
+ public void testNPE() throws Exception {
+ int numPartition = 3;
+ BestPossibleExternalViewVerifier verifier =
+ new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+ // Create 1 WAGED Resource
+ String firstDB = "firstDB";
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(), null);
+ IdealState idealStateOne =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
firstDB);
+ idealStateOne.setMinActiveReplicas(2);
+ idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
firstDB, idealStateOne);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
+
+ // Wait for cluster to converge
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // Drop resource
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB);
+
+ // Wait for cluster to converge
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // add instance
+ addParticipant("instance_to_add");
+
+ // Wait for cluster to converge
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // Add a new resource
+ String secondDb = "secondDB";
+ _configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new
ResourceConfig(secondDb));
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(), null);
+ IdealState idealStateTwo =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
secondDb);
+ idealStateTwo.setMinActiveReplicas(2);
+ idealStateTwo.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
secondDb, idealStateTwo);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3);
+
+ // Confirm cluster can converge. Cluster will not converge if NPE occurs
during pipeline run
+ Assert.assertTrue(verifier.verifyByPolling());
+ }
+
+ public MockParticipantManager addParticipant(String instanceName) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ _participants.add(participant);
+ return participant;
+ }
+}