This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a170c71 Prevent controller from dropping replicas when best possible 
fails to calculate assignment (#3034)
02a170c71 is described below

commit 02a170c7106081108f0563fc09640f7a67a603a9
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Sep 17 11:28:20 2025 -0700

    Prevent controller from dropping replicas when best possible fails to 
calculate assignment (#3034)
    
    * prevent DROPPED messages when mapping cannot be computed
    
    * add test for CRUSHED resource
    
    * add message generation phase tst
    
    * fix test
    
    * respond feedback
---
 .../controller/stages/MessageGenerationPhase.java  |  7 +-
 .../controller/stages/TestRebalancePipeline.java   | 67 +++++++++++++++
 .../TestPreserveAssignmentsOnRebalanceFailure.java | 98 ++++++++++++++++++++++
 3 files changed, 171 insertions(+), 1 deletion(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 0868ac6d6..ca6143100 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -154,8 +154,13 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
       // resourceStateMap. This instance may not have had been dropped by the 
rebalance strategy.
       // This check is required to ensure that the instances removed from the 
ideal state stateMap
       // are properly dropped.
+      // This should only solve for instance operation case where the instance 
is removed from the statemap but there
+      // are still valid assignments in the mapping. We should not consider 
case where there is no mapping at all for
+      // the resource, which can occur on a rebalance failure. If the resource 
has been removed, the partition has
+      // been removed, or the replication factor has been reduced the BP will 
contain the DROPPED states.See method
+      // AbstractRebalancer.computeBestPossibleMap - drops replica that is in 
current state but not in preference list.
       for (String instance : currentStateMap.keySet()) {
-        if (!instanceStateMap.containsKey(instance)) {
+        if (!instanceStateMap.isEmpty() && 
!instanceStateMap.containsKey(instance)) {
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
         }
       }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index d3c41018c..6a5e4a6de 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -37,6 +37,8 @@ import org.apache.helix.ZkUnitTestBase;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import 
org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -645,6 +647,71 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     deleteCluster(clusterName);
   }
 
+  @Test
+  public void testNoMessagesSentOnNoResourceMapping() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = _className + "_" + methodName;
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+
+    admin.addCluster(clusterName);
+
+    final String resourceName = "testResource_" + methodName;
+    final String partitionName = resourceName + "_0";
+
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0});
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new 
int[] {
+        0
+    });
+    int numPartition = 3;
+    _gSetupTool.addResourceToCluster(clusterName, resourceName, numPartition, 
"LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO.name(), 
CrushEdRebalanceStrategy.class.getName());
+    IdealState idealStateOne =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, 
resourceName);
+    
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, 
resourceName, idealStateOne);
+    _gSetupTool.rebalanceStorageCluster(clusterName, resourceName, 1);
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<>(_gZkClient));
+    DummyClusterManager manager =
+        new DummyClusterManager(clusterName, accessor, 
Long.toHexString(_gZkClient.getSessionId()));
+    ClusterEvent event = new ClusterEvent(clusterName, 
ClusterEventType.OnDemandRebalance);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider());
+
+    // cluster data cache refresh pipeline
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+
+    // rebalance pipeline
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ResourceComputationStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    // Add empty best possible output to mimic no calculations being made
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), new 
BestPossibleStateOutput());
+    rebalancePipeline.addStage(new MessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
+    rebalancePipeline.addStage(new MessageThrottleStage());
+    rebalancePipeline.addStage(new ResourceMessageDispatchStage());
+
+    // Set currentState
+    setCurrentState(clusterName, "localhost_0", resourceName, partitionName,
+        liveInstances.get(0).getEphemeralOwner(), "LEADER");
+
+    runPipeline(event, dataRefresh, false);
+
+    runPipeline(event, rebalancePipeline, true);
+
+    // Assert no messages are being sent
+    MessageOutput msgThrottleOutput = 
event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
+    List<Message> messages =
+        msgThrottleOutput.getMessages(resourceName, new 
Partition(partitionName));
+    Assert.assertTrue(messages.isEmpty());
+  }
+
   protected void setCurrentState(String clusterName, String instance, String 
resourceGroupName,
       String resourceKey, String sessionId, String state) {
     setCurrentState(clusterName, instance, resourceGroupName, resourceKey, 
sessionId, state, false);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
new file mode 100644
index 000000000..3e9969ae8
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestPreserveAssignmentsOnRebalanceFailure.java
@@ -0,0 +1,98 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestPreserveAssignmentsOnRebalanceFailure extends ZkTestBase {
+
+  private final String CLUSTER_NAME = TestHelper.getTestClassName() + 
"_cluster";
+  public final int PARTICIPANT_COUNT = 3;
+  private ClusterControllerManager _controller;
+  private ConfigAccessor _configAccessor;
+  private StrictMatchExternalViewVerifier _externalViewVerifier;
+  private BestPossibleExternalViewVerifier _bestPossibleVerifier;
+
+  @BeforeClass
+  public void setup() {
+    System.out.println("Start test " + TestHelper.getTestClassName());
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < PARTICIPANT_COUNT; i++) {
+      String instanceName = "localhost_" + i;
+      addParticipant(CLUSTER_NAME, instanceName);
+      InstanceConfig instanceConfig = 
_configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
+      instanceConfig.setDomain("zone=zone" + i);
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, 
instanceConfig);
+    }
+
+    // Enable topology aware rebalance and set expcted topology
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setFaultZoneType("zone");
+    clusterConfig.setTopology("/zone");
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _externalViewVerifier = new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setDeactivatedNodeAwareness(true)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+            .build();
+    _bestPossibleVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
+            .setZkAddr(ZK_ADDR)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+            .build();
+  }
+
+  // This test verifies that when a mapping cannot be generated for a resource 
(failureResources in
+  // BestPossibleStateCalcStage), the replicas are not dropped
+  @Test
+  public void testPreserveAssignmentsOnRebalanceFailure() {
+    System.out.println("Start test: " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName());
+
+    // Create a CRUSHED resource
+    int numPartition = 3;
+    String firstDB = "firstDB";
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, 
"LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO.name(), 
CrushEdRebalanceStrategy.class.getName());
+    IdealState idealStateOne =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
firstDB);
+    idealStateOne.setMinActiveReplicas(2);
+    
idealStateOne.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
firstDB, idealStateOne);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
+
+    // Wait for cluster to converge and take a snapshot of the ExternalView
+    Assert.assertTrue(_bestPossibleVerifier.verifyByPolling());
+    ExternalView oldEV = 
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
firstDB);
+
+    // Add an instance with no domain set to the cluster, this will cause the 
topology aware assignment to fail
+    String badInstance = "bad_instance";
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, badInstance);
+
+    // Assert EV = IS
+    Assert.assertTrue(_externalViewVerifier.verifyByPolling());
+
+    // Check that the new EV (after bad instance added) is the same as the old 
EV (before bad instance added)
+    ExternalView newEV = 
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
firstDB);
+    Assert.assertEquals(oldEV, newEV);
+    System.out.println("End test: " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName());
+  }
+}

Reply via email to