This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 4e1b5ea53 Fixing the flaky topology migration check and
ConstrainBasedAlgorith replicaHash computation (#2998)
4e1b5ea53 is described below
commit 4e1b5ea5386b85035f48e4245bfbbe3e065bef26
Author: Zachary Pinto <[email protected]>
AuthorDate: Fri Jan 17 15:54:09 2025 -0800
Fixing the flaky topology migration check and ConstrainBasedAlgorith
replicaHash computation (#2998)
- Fixing the flaky topology migration check which should isolate shuffling
to a single instance tag. After investigation, a fix was needed for
ContraintBasedAlgorithm to compute the replica hash using only the assignable
instances for a given instance tag. This is to ensure that cluster topology
changes to instances with specific tags only affects resources with those tags.
- Fix incorrect condition to decide whether to add swap in instance state
to the stateMap in BestPossibleStateCalcStage, so no null values can be set for
state.
---
.../constraints/ConstraintBasedAlgorithm.java | 10 ++--
.../rebalancer/waged/model/ClusterModel.java | 26 ++++++++++
.../stages/BestPossibleStateCalcStage.java | 2 +-
.../controller/TestTopologyMigration.java | 55 +++++++++++++---------
4 files changed, 68 insertions(+), 25 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index e7510987a..b03af0771 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -22,7 +22,6 @@ package
org.apache.helix.controller.rebalancer.waged.constraints;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -31,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
@@ -218,7 +216,13 @@ class ConstraintBasedAlgorithm implements
RebalanceAlgorithm {
.containsKey(replica.getResourceName());
_isInBaselineAssignment =
clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
- _replicaHash = Objects.hash(replica.toString(),
clusterModel.getAssignableLogicalIds());
+
+ // _replicaHash is used to randomize the replicas order so that the same
replicas are not
+ // always moved in each rebalance. We only use instances which satisfy
the replica's instance
+ // group tag to calculate the hash code because topology changes to a
single instance group tag
+ // should be isolated from other instance group tags. Assignable replica
ordering changes only
+ // change when the topology of the instance group tag changes.
+ _replicaHash = Objects.hash(replica.toString(),
clusterModel.getAssignableNodesForInstanceGroupTag(replica.getResourceInstanceGroupTag()));
computeScore(overallClusterRemainingCapacityMap);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 7ef503e01..6ab675789 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -20,6 +20,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
*/
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -38,6 +40,7 @@ public class ClusterModel {
private final Map<String, Map<String, AssignableReplica>>
_assignableReplicaIndex;
private final Map<String, AssignableNode> _assignableNodeMap;
private final Set<String> _assignableNodeLogicalIds;
+ private final Map<String, Set<String>> _assignableLogicalIdsByInstanceTag;
/**
* @param clusterContext The initialized cluster context.
@@ -64,6 +67,15 @@ public class ClusterModel {
_assignableNodeLogicalIds =
assignableNodes.parallelStream().map(AssignableNode::getLogicalId)
.collect(Collectors.toSet());
+
+ // Index all the instances by their instance tags
+ _assignableLogicalIdsByInstanceTag = new HashMap<>();
+ assignableNodes.forEach(node -> {
+ node.getInstanceTags().forEach(tag -> {
+ _assignableLogicalIdsByInstanceTag.computeIfAbsent(tag, key -> new
HashSet<>())
+ .add(node.getLogicalId());
+ });
+ });
}
public ClusterContext getContext() {
@@ -78,6 +90,20 @@ public class ClusterModel {
return _assignableNodeLogicalIds;
}
+ /**
+ * Get the assignable nodes for the given instance tag.
+ * If the instance tag is null, return all the assignable nodes.
+ *
+ * @param instanceTag The instance tag.
+ * @return The set of assignable logical IDs.
+ */
+ public Set<String> getAssignableNodesForInstanceGroupTag(String instanceTag)
{
+ if (instanceTag == null) {
+ return getAssignableLogicalIds();
+ }
+ return _assignableLogicalIdsByInstanceTag.getOrDefault(instanceTag,
getAssignableLogicalIds());
+ }
+
public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
return _assignableReplicaMap;
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index f2a30a525..152c766e4 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -232,7 +232,7 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
cache.getStateModelDef(resourceMap.get(resourceName).getStateModelDefRef()),
stateMap, swapInToSwapOutInstancePairs.get(swapInInstance),
swapInInstance, resourceName,
partition.getPartitionName(), cache);
- if (stateMap != null) {
+ if (selectedState != null) {
bestPossibleStateOutput.setState(resourceName, partition,
swapInInstance,
selectedState);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
index 9a076ece3..827c4ce15 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java
@@ -31,8 +31,6 @@ import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.examples.LeaderStandbyStateModelFactory;
@@ -40,12 +38,10 @@ import
org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -55,6 +51,7 @@ import org.testng.annotations.Test;
public class TestTopologyMigration extends ZkTestBase {
private static final int START_PORT = 12918; // Starting port for mock
participants
private static int _nextStartPort = START_PORT; // Incremental port for
participants
+ private static final String RESOURCE_PREFIX = "TestDB"; // Prefix for
resource names
private static final String TEST_CAPACITY_KEY = "TestCapacityKey";
private static final int TEST_CAPACITY_VALUE = 100; // Default instance
capacity for testing
private static final String RACK = "rack"; // Rack identifier in topology
@@ -65,12 +62,12 @@ public class TestTopologyMigration extends ZkTestBase {
// Initial topology format
private static final String MIGRATED_TOPOLOGY =
String.format("/%s/%s/%s", MZ, HOST, APPLICATION_INSTANCE_ID); // New
topology format
- private static final int INIT_ZONE_COUNT = 12; // Initial zone count
- private static final int MIGRATE_ZONE_COUNT = 6; // Zone count post-migration
+ private static final int INIT_ZONE_COUNT = 6; // Initial zone count
+ private static final int MIGRATE_ZONE_COUNT = 3; // Zone count post-migration
private static final int RESOURCE_COUNT = 2; // Number of resources in the
cluster
- private static final int INSTANCES_PER_RESOURCE = 12; // Number of instances
per resource
- private static final int PARTITIONS = 3; // Number of partitions
- private static final int REPLICA = 6; // Number of replicas
+ private static final int INSTANCES_PER_RESOURCE = 6; // Number of instances
per resource
+ private static final int PARTITIONS = 10; // Number of partitions
+ private static final int REPLICA = 3; // Number of replicas
private static final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L;
// Delay time for resource rebalance
@@ -102,12 +99,15 @@ public class TestTopologyMigration extends ZkTestBase {
// Set up cluster configuration and participants
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
setupClusterConfig(INIT_TOPOLOGY, RACK);
- setupInitResourcesAndParticipants();
// Initialize cluster verifier for validating state
_clusterVerifier = new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-
.setResources(_allDBs).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
+
+ // Setup the participants and resources for the test
+ setupInitParticipants();
+ setupInitResources();
}
/**
@@ -150,9 +150,9 @@ public class TestTopologyMigration extends ZkTestBase {
/**
* Sets up initial resources and mock participants for the cluster.
*/
- private void setupInitResourcesAndParticipants() throws Exception {
+ private void setupInitParticipants() throws Exception {
for (int i = 0; i < RESOURCE_COUNT; i++) {
- String dbName = "TestDB_" + i;
+ String dbName = RESOURCE_PREFIX + i;
// Create and start participants for the resource
for (int j = 0; j < INSTANCES_PER_RESOURCE; j++) {
@@ -169,15 +169,20 @@ public class TestTopologyMigration extends ZkTestBase {
_nextStartPort++;
_participants.add(participant);
}
+ }
+ }
- // Set up IdealState for the resource
+ private void setupInitResources() throws Exception {
+ setAndVerifyMaintenanceMode(true);
+ for (int i = 0; i < RESOURCE_COUNT; i++) {
+ String dbName = RESOURCE_PREFIX + i;
+ _allDBs.add(dbName);
IdealState is = createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS,
REPLICA, REPLICA - 1);
- is.setResourceGroupName(dbName);
+ is.setInstanceGroupTag(dbName);
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
dbName, is);
-
- _allDBs.add(dbName);
}
+ setAndVerifyMaintenanceMode(false);
}
/**
@@ -186,8 +191,9 @@ public class TestTopologyMigration extends ZkTestBase {
private MockParticipantManager createParticipant(String participantName)
throws Exception {
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10,
null);
- participant.getStateMachineEngine()
- .registerStateModelFactory("LeaderStandby", new
LeaderStandbyStateModelFactory());
+ LeaderStandbyStateModelFactory factory = new
LeaderStandbyStateModelFactory();
+ factory.setInstanceName(participantName);
+
participant.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
factory);
return participant;
}
@@ -198,6 +204,7 @@ public class TestTopologyMigration extends ZkTestBase {
public void testTopologyMigrationByResourceGroup() throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ System.out.println("Capturing initial external views");
// Step 1: Migrate to new topology in maintenance mode
Map<String, ExternalView> originalEVs = getEVs();
List<InstanceConfig> instanceConfigs =
@@ -205,9 +212,14 @@ public class TestTopologyMigration extends ZkTestBase {
instanceName -> _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME,
instanceName)).collect(Collectors.toList());
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ System.out.println("Setting MM to true");
setAndVerifyMaintenanceMode(true);
setupClusterConfig(MIGRATED_TOPOLOGY, MZ);
migrateInstanceConfigTopology(instanceConfigs);
+ validateNoShufflingOccurred(originalEVs, null);
+ System.out.println("Setting MM to false");
setAndVerifyMaintenanceMode(false);
// Verify cluster did not have shuffling anywhere after
@@ -216,9 +228,10 @@ public class TestTopologyMigration extends ZkTestBase {
// Step 2: Update domain values for one resource group at a time
for (String updatingDb : _allDBs) {
+ System.out.println("Begin resource group migration for: " + updatingDb);
Map<String, ExternalView> preMigrationEVs = getEVs();
setAndVerifyMaintenanceMode(true);
- migrateDomainForResourceGroup(updatingDb);
+ migrateDomainForInstanceTag(updatingDb);
setAndVerifyMaintenanceMode(false);
// Verify cluster only had shuffling in the resource group that was
updated
@@ -314,7 +327,7 @@ public class TestTopologyMigration extends ZkTestBase {
}
}
- private void migrateDomainForResourceGroup(String resourceGroup) throws
Exception {
+ private void migrateDomainForInstanceTag(String resourceGroup) throws
Exception {
int instanceIndex = 0;
for (MockParticipantManager participant : _participants) {
InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool()