This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 02a170c71 Prevent controller from dropping replicas when best possible
fails to calculate assignment (#3034)
02a170c71 is described below
commit 02a170c7106081108f0563fc09640f7a67a603a9
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Sep 17 11:28:20 2025 -0700
Prevent controller from dropping replicas when best possible fails to
calculate assignment (#3034)
* prevent DROPPED messages when mapping cannot be computed
* add test for CRUSHED resource
* add message generation phase tst
* fix test
* respond feedback
---
.../controller/stages/MessageGenerationPhase.java | 7 +-
.../controller/stages/TestRebalancePipeline.java | 67 +++++++++++++++
.../TestPreserveAssignmentsOnRebalanceFailure.java | 98 ++++++++++++++++++++++
3 files changed, 171 insertions(+), 1 deletion(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 0868ac6d6..ca6143100 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -154,8 +154,13 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
// resourceStateMap. This instance may not have had been dropped by the
rebalance strategy.
// This check is required to ensure that the instances removed from the
ideal state stateMap
// are properly dropped.
+ // This should only solve for instance operation case where the instance
is removed from the statemap but there
+ // are still valid assignments in the mapping. We should not consider
case where there is no mapping at all for
+ // the resource, which can occur on a rebalance failure. If the resource
has been removed, the partition has
+ // been removed, or the replication factor has been reduced the BP will
contain the DROPPED states.See method
+ // AbstractRebalancer.computeBestPossibleMap - drops replica that is in
current state but not in preference list.
for (String instance : currentStateMap.keySet()) {
- if (!instanceStateMap.containsKey(instance)) {
+ if (!instanceStateMap.isEmpty() &&
!instanceStateMap.containsKey(instance)) {
instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d3c41018c..6a5e4a6de 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -37,6 +37,8 @@ import org.apache.helix.ZkUnitTestBase;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -645,6 +647,71 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
deleteCluster(clusterName);
}
+ @Test
+ public void testNoMessagesSentOnNoResourceMapping() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = _className + "_" + methodName;
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+
+ admin.addCluster(clusterName);
+
+ final String resourceName = "testResource_" + methodName;
+ final String partitionName = resourceName + "_0";
+
+ setupStateModel(clusterName);
+ setupInstances(clusterName, new int[]{0});
+ List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new
int[] {
+ 0
+ });
+ int numPartition = 3;
+ _gSetupTool.addResourceToCluster(clusterName, resourceName, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(),
CrushEdRebalanceStrategy.class.getName());
+ IdealState idealStateOne =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName,
resourceName);
+
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName,
resourceName, idealStateOne);
+ _gSetupTool.rebalanceStorageCluster(clusterName, resourceName, 1);
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(_gZkClient));
+ DummyClusterManager manager =
+ new DummyClusterManager(clusterName, accessor,
Long.toHexString(_gZkClient.getSessionId()));
+ ClusterEvent event = new ClusterEvent(clusterName,
ClusterEventType.OnDemandRebalance);
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
+ new ResourceControllerDataProvider());
+
+ // cluster data cache refresh pipeline
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+
+ // rebalance pipeline
+ Pipeline rebalancePipeline = new Pipeline();
+ rebalancePipeline.addStage(new ResourceComputationStage());
+ rebalancePipeline.addStage(new CurrentStateComputationStage());
+ // Add empty best possible output to mimic no calculations being made
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), new
BestPossibleStateOutput());
+ rebalancePipeline.addStage(new MessageGenerationPhase());
+ rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
+ rebalancePipeline.addStage(new MessageThrottleStage());
+ rebalancePipeline.addStage(new ResourceMessageDispatchStage());
+
+ // Set currentState
+ setCurrentState(clusterName, "localhost_0", resourceName, partitionName,
+ liveInstances.get(0).getEphemeralOwner(), "LEADER");
+
+ runPipeline(event, dataRefresh, false);
+
+ runPipeline(event, rebalancePipeline, true);
+
+ // Assert no messages are being sent
+ MessageOutput msgThrottleOutput =
event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
+ List<Message> messages =
+ msgThrottleOutput.getMessages(resourceName, new
Partition(partitionName));
+ Assert.assertTrue(messages.isEmpty());
+ }
+
protected void setCurrentState(String clusterName, String instance, String
resourceGroupName,
String resourceKey, String sessionId, String state) {
setCurrentState(clusterName, instance, resourceGroupName, resourceKey,
sessionId, state, false);
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
new file mode 100644
index 000000000..3e9969ae8
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
@@ -0,0 +1,98 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestPreserveAssignmentsOnRebalanceFailure extends ZkTestBase {
+
+ private final String CLUSTER_NAME = TestHelper.getTestClassName() +
"_cluster";
+ public final int PARTICIPANT_COUNT = 3;
+ private ClusterControllerManager _controller;
+ private ConfigAccessor _configAccessor;
+ private StrictMatchExternalViewVerifier _externalViewVerifier;
+ private BestPossibleExternalViewVerifier _bestPossibleVerifier;
+
+ @BeforeClass
+ public void setup() {
+ System.out.println("Start test " + TestHelper.getTestClassName());
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < PARTICIPANT_COUNT; i++) {
+ String instanceName = "localhost_" + i;
+ addParticipant(CLUSTER_NAME, instanceName);
+ InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
+ instanceConfig.setDomain("zone=zone" + i);
+ _configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName,
instanceConfig);
+ }
+
+ // Enable topology aware rebalance and set expcted topology
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setFaultZoneType("zone");
+ clusterConfig.setTopology("/zone");
+ clusterConfig.setTopologyAwareEnabled(true);
+ clusterConfig.setPersistBestPossibleAssignment(true);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ _externalViewVerifier = new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setDeactivatedNodeAwareness(true)
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ _bestPossibleVerifier = new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
+ .setZkAddr(ZK_ADDR)
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ }
+
+ // This test verifies that when a mapping cannot be generated for a resource
(failureResources in
+ // BestPossibleStateCalcStage), the replicas are not dropped
+ @Test
+ public void testPreserveAssignmentsOnRebalanceFailure() {
+ System.out.println("Start test: " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName());
+
+ // Create a CRUSHED resource
+ int numPartition = 3;
+ String firstDB = "firstDB";
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition,
"LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(),
CrushEdRebalanceStrategy.class.getName());
+ IdealState idealStateOne =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
firstDB);
+ idealStateOne.setMinActiveReplicas(2);
+
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
firstDB, idealStateOne);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
+
+ // Wait for cluster to converge and take a snapshot of the ExternalView
+ Assert.assertTrue(_bestPossibleVerifier.verifyByPolling());
+ ExternalView oldEV =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
firstDB);
+
+ // Add an instance with no domain set to the cluster, this will cause the
topology aware assignment to fail
+ String badInstance = "bad_instance";
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, badInstance);
+
+ // Assert EV = IS
+ Assert.assertTrue(_externalViewVerifier.verifyByPolling());
+
+ // Check that the new EV (after bad instance added) is the same as the old
EV (before bad instance added)
+ ExternalView newEV =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
firstDB);
+ Assert.assertEquals(oldEV, newEV);
+ System.out.println("End test: " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName());
+ }
+}