This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e1b5ea53 Fixing the flaky topology migration check and 
ConstrainBasedAlgorith replicaHash computation (#2998)
4e1b5ea53 is described below

commit 4e1b5ea5386b85035f48e4245bfbbe3e065bef26
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Jan 17 15:54:09 2025 -0800

    Fixing the flaky topology migration check and ConstrainBasedAlgorith 
replicaHash computation (#2998)
    
     - Fixing the flaky topology migration check which should isolate shuffling 
to a single instance tag. After investigation, a fix was needed for 
ContraintBasedAlgorithm to compute the replica hash using only the assignable 
instances for a given instance tag. This is to ensure that cluster topology 
changes to instances with specific tags only affects resources with those tags.
    - Fix incorrect condition to decide whether to add swap in instance state 
to the stateMap in BestPossibleStateCalcStage, so no null values can be set for 
state.
---
 .../constraints/ConstraintBasedAlgorithm.java      | 10 ++--
 .../rebalancer/waged/model/ClusterModel.java       | 26 ++++++++++
 .../stages/BestPossibleStateCalcStage.java         |  2 +-
 .../controller/TestTopologyMigration.java          | 55 +++++++++++++---------
 4 files changed, 68 insertions(+), 25 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index e7510987a..b03af0771 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -22,7 +22,6 @@ package 
org.apache.helix.controller.rebalancer.waged.constraints;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -31,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
@@ -218,7 +216,13 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
           .containsKey(replica.getResourceName());
       _isInBaselineAssignment =
           
clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
-      _replicaHash = Objects.hash(replica.toString(), 
clusterModel.getAssignableLogicalIds());
+
+      // _replicaHash is used to randomize the replicas order so that the same 
replicas are not
+      // always moved in each rebalance. We only use instances which satisfy 
the replica's instance
+      // group tag to calculate the hash code because topology changes to a 
single instance group tag
+      // should be isolated from other instance group tags. Assignable replica 
ordering changes only
+      // change when the topology of the instance group tag changes.
+      _replicaHash = Objects.hash(replica.toString(), 
clusterModel.getAssignableNodesForInstanceGroupTag(replica.getResourceInstanceGroupTag()));
       computeScore(overallClusterRemainingCapacityMap);
     }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 7ef503e01..6ab675789 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -20,6 +20,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
  */
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -38,6 +40,7 @@ public class ClusterModel {
   private final Map<String, Map<String, AssignableReplica>> 
_assignableReplicaIndex;
   private final Map<String, AssignableNode> _assignableNodeMap;
   private final Set<String> _assignableNodeLogicalIds;
+  private final Map<String, Set<String>> _assignableLogicalIdsByInstanceTag;
 
   /**
    * @param clusterContext         The initialized cluster context.
@@ -64,6 +67,15 @@ public class ClusterModel {
     _assignableNodeLogicalIds =
         assignableNodes.parallelStream().map(AssignableNode::getLogicalId)
             .collect(Collectors.toSet());
+
+    // Index all the instances by their instance tags
+    _assignableLogicalIdsByInstanceTag = new HashMap<>();
+    assignableNodes.forEach(node -> {
+      node.getInstanceTags().forEach(tag -> {
+        _assignableLogicalIdsByInstanceTag.computeIfAbsent(tag, key -> new 
HashSet<>())
+            .add(node.getLogicalId());
+      });
+    });
   }
 
   public ClusterContext getContext() {
@@ -78,6 +90,20 @@ public class ClusterModel {
     return _assignableNodeLogicalIds;
   }
 
+  /**
+   * Get the assignable nodes for the given instance tag.
+   * If the instance tag is null, return all the assignable nodes.
+   *
+   * @param instanceTag The instance tag.
+   * @return The set of assignable logical IDs.
+   */
+  public Set<String> getAssignableNodesForInstanceGroupTag(String instanceTag) 
{
+    if (instanceTag == null) {
+      return getAssignableLogicalIds();
+    }
+    return _assignableLogicalIdsByInstanceTag.getOrDefault(instanceTag, 
getAssignableLogicalIds());
+  }
+
   public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
     return _assignableReplicaMap;
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index f2a30a525..152c766e4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -232,7 +232,7 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
                 
cache.getStateModelDef(resourceMap.get(resourceName).getStateModelDefRef()),
                 stateMap, swapInToSwapOutInstancePairs.get(swapInInstance), 
swapInInstance, resourceName,
                 partition.getPartitionName(), cache);
-            if (stateMap != null) {
+            if (selectedState != null) {
               bestPossibleStateOutput.setState(resourceName, partition, 
swapInInstance,
                   selectedState);
             }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
index 9a076ece3..827c4ce15 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
@@ -31,8 +31,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.examples.LeaderStandbyStateModelFactory;
@@ -40,12 +38,10 @@ import 
org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -55,6 +51,7 @@ import org.testng.annotations.Test;
 public class TestTopologyMigration extends ZkTestBase {
   private static final int START_PORT = 12918; // Starting port for mock 
participants
   private static int _nextStartPort = START_PORT; // Incremental port for 
participants
+  private static final String RESOURCE_PREFIX = "TestDB"; // Prefix for 
resource names
   private static final String TEST_CAPACITY_KEY = "TestCapacityKey";
   private static final int TEST_CAPACITY_VALUE = 100; // Default instance 
capacity for testing
   private static final String RACK = "rack"; // Rack identifier in topology
@@ -65,12 +62,12 @@ public class TestTopologyMigration extends ZkTestBase {
   // Initial topology format
   private static final String MIGRATED_TOPOLOGY =
       String.format("/%s/%s/%s", MZ, HOST, APPLICATION_INSTANCE_ID); // New 
topology format
-  private static final int INIT_ZONE_COUNT = 12; // Initial zone count
-  private static final int MIGRATE_ZONE_COUNT = 6; // Zone count post-migration
+  private static final int INIT_ZONE_COUNT = 6; // Initial zone count
+  private static final int MIGRATE_ZONE_COUNT = 3; // Zone count post-migration
   private static final int RESOURCE_COUNT = 2; // Number of resources in the 
cluster
-  private static final int INSTANCES_PER_RESOURCE = 12; // Number of instances 
per resource
-  private static final int PARTITIONS = 3; // Number of partitions
-  private static final int REPLICA = 6; // Number of replicas
+  private static final int INSTANCES_PER_RESOURCE = 6; // Number of instances 
per resource
+  private static final int PARTITIONS = 10; // Number of partitions
+  private static final int REPLICA = 3; // Number of replicas
   private static final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L;
   // Delay time for resource rebalance
 
@@ -102,12 +99,15 @@ public class TestTopologyMigration extends ZkTestBase {
     // Set up cluster configuration and participants
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
     setupClusterConfig(INIT_TOPOLOGY, RACK);
-    setupInitResourcesAndParticipants();
 
     // Initialize cluster verifier for validating state
     _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        
.setResources(_allDBs).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
         .build();
+
+    // Setup the participants and resources for the test
+    setupInitParticipants();
+    setupInitResources();
   }
 
   /**
@@ -150,9 +150,9 @@ public class TestTopologyMigration extends ZkTestBase {
   /**
    * Sets up initial resources and mock participants for the cluster.
    */
-  private void setupInitResourcesAndParticipants() throws Exception {
+  private void setupInitParticipants() throws Exception {
     for (int i = 0; i < RESOURCE_COUNT; i++) {
-      String dbName = "TestDB_" + i;
+      String dbName = RESOURCE_PREFIX + i;
 
       // Create and start participants for the resource
       for (int j = 0; j < INSTANCES_PER_RESOURCE; j++) {
@@ -169,15 +169,20 @@ public class TestTopologyMigration extends ZkTestBase {
         _nextStartPort++;
         _participants.add(participant);
       }
+    }
+  }
 
-      // Set up IdealState for the resource
+  private void setupInitResources() throws Exception {
+    setAndVerifyMaintenanceMode(true);
+    for (int i = 0; i < RESOURCE_COUNT; i++) {
+      String dbName = RESOURCE_PREFIX + i;
+      _allDBs.add(dbName);
       IdealState is = createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
           BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, 
REPLICA, REPLICA - 1);
-      is.setResourceGroupName(dbName);
+      is.setInstanceGroupTag(dbName);
       
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
dbName, is);
-
-      _allDBs.add(dbName);
     }
+    setAndVerifyMaintenanceMode(false);
   }
 
   /**
@@ -186,8 +191,9 @@ public class TestTopologyMigration extends ZkTestBase {
   private MockParticipantManager createParticipant(String participantName) 
throws Exception {
     MockParticipantManager participant =
         new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, 
null);
-    participant.getStateMachineEngine()
-        .registerStateModelFactory("LeaderStandby", new 
LeaderStandbyStateModelFactory());
+    LeaderStandbyStateModelFactory factory = new 
LeaderStandbyStateModelFactory();
+    factory.setInstanceName(participantName);
+    
participant.getStateMachineEngine().registerStateModelFactory("LeaderStandby", 
factory);
     return participant;
   }
 
@@ -198,6 +204,7 @@ public class TestTopologyMigration extends ZkTestBase {
   public void testTopologyMigrationByResourceGroup() throws Exception {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
+    System.out.println("Capturing initial external views");
     // Step 1: Migrate to new topology in maintenance mode
     Map<String, ExternalView> originalEVs = getEVs();
     List<InstanceConfig> instanceConfigs =
@@ -205,9 +212,14 @@ public class TestTopologyMigration extends ZkTestBase {
             instanceName -> _gSetupTool.getClusterManagementTool()
                 .getInstanceConfig(CLUSTER_NAME, 
instanceName)).collect(Collectors.toList());
 
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    System.out.println("Setting MM to true");
     setAndVerifyMaintenanceMode(true);
     setupClusterConfig(MIGRATED_TOPOLOGY, MZ);
     migrateInstanceConfigTopology(instanceConfigs);
+    validateNoShufflingOccurred(originalEVs, null);
+    System.out.println("Setting MM to false");
     setAndVerifyMaintenanceMode(false);
 
     // Verify cluster did not have shuffling anywhere after
@@ -216,9 +228,10 @@ public class TestTopologyMigration extends ZkTestBase {
 
     // Step 2: Update domain values for one resource group at a time
     for (String updatingDb : _allDBs) {
+      System.out.println("Begin resource group migration for: " + updatingDb);
       Map<String, ExternalView> preMigrationEVs = getEVs();
       setAndVerifyMaintenanceMode(true);
-      migrateDomainForResourceGroup(updatingDb);
+      migrateDomainForInstanceTag(updatingDb);
       setAndVerifyMaintenanceMode(false);
 
       // Verify cluster only had shuffling in the resource group that was 
updated
@@ -314,7 +327,7 @@ public class TestTopologyMigration extends ZkTestBase {
     }
   }
 
-  private void migrateDomainForResourceGroup(String resourceGroup) throws 
Exception {
+  private void migrateDomainForInstanceTag(String resourceGroup) throws 
Exception {
     int instanceIndex = 0;
     for (MockParticipantManager participant : _participants) {
       InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool()

Reply via email to