This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by
this push:
new e7b4c5386 Exclude on-operation instance from computing min active
replica in WAGED. (#2621)
e7b4c5386 is described below
commit e7b4c5386af43d24cb243538228f73aa6b40ab71
Author: xyuanlu <[email protected]>
AuthorDate: Wed Sep 20 10:58:08 2023 -0700
Exclude on-operation instance from computing min active replica in WAGED.
(#2621)
Exclude on-operation instance from computing min active replica in WAGED.
---
.../rebalancer/waged/WagedRebalancer.java | 15 ++++++--
.../rebalancer/TestInstanceOperation.java | 42 ++++++++++++++++------
2 files changed, 44 insertions(+), 13 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 6c1c4d74d..e717aa996 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,6 +47,7 @@ import
org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
@@ -395,7 +396,8 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
Map<String, ResourceAssignment> currentResourceAssignment,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
- final Set<String> enabledLiveInstances =
clusterData.getEnabledLiveInstances();
+ // TODO: this is a hacky way to filter our on operation instance. We
should consider redesign `getEnabledLiveInstances()`.
+ final Set<String> enabledLiveInstances =
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
clusterData.getEnabledLiveInstances());
if (activeNodes.equals(enabledLiveInstances) ||
!requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
return currentResourceAssignment;
@@ -424,6 +426,14 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
}
}
+ private static Set<String> filterOutOnOperationInstances(Map<String,
InstanceConfig> instanceConfigMap,
+ Set<String> nodes) {
+ return nodes.stream()
+ .filter(
+ instance ->
!DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
+ .collect(Collectors.toSet());
+ }
+
/**
* Emergency rebalance is scheduled to quickly handle urgent cases like
reassigning partitions from inactive nodes
* and addressing for partitions failing to meet minActiveReplicas.
@@ -608,7 +618,8 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment ->
{
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ Set<String> enabledLiveInstances =
+ filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
clusterData.getEnabledLiveInstances());
int numReplica =
currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 6c51d58bb..c9c9c7597 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -15,6 +15,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
@@ -89,9 +90,11 @@ public class TestInstanceOperation extends ZkTestBase {
ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.stateTransitionCancelEnabled(true);
+ clusterConfig.setDelayRebalaceEnabled(true);
+ clusterConfig.setRebalanceDelayTime(1800000L);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
- createTestDBs(200);
+ createTestDBs(1800000L);
setUpWagedBaseline();
@@ -199,7 +202,7 @@ public class TestInstanceOperation extends ZkTestBase {
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME,
"TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
- REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
+ REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB3_DELAYED_CRUSHED");
// add a resource where downward state transition is slow
createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED",
"MasterSlave",
@@ -338,21 +341,38 @@ public class TestInstanceOperation extends ZkTestBase {
@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
+ _participants.get(1).syncStop();
_participants.get(2).syncStop();
- _participants.get(3).syncStop();
- // wait for converge, and set evacuate on instance 0
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
- String evacuateInstanceName = _participants.get(0).getInstanceName();
+ String evacuateInstanceName =
_participants.get(_participants.size()-2).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, evacuateInstanceName,
InstanceConstants.InstanceOperation.EVACUATE);
- Map<String, IdealState> assignment;
- List<String> currentActiveInstances =
- _participantNames.stream().filter(n ->
(!n.equals(evacuateInstanceName) &&
!n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
- TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);},
TestHelper.WAIT_DURATION);
+ Map<String, ExternalView> assignment;
+ // EV should contain all participants, check resources one by one
+ assignment = getEV();
+ for (String resource : _allDBs) {
+ ExternalView ev = assignment.get(resource);
+ for (String partition : ev.getPartitionSet()) {
+ AtomicInteger activeReplicaCount = new AtomicInteger();
+ ev.getStateMap(partition)
+ .values()
+ .stream()
+ .filter(
+ v -> v.equals("MASTER") || v.equals("LEADER") ||
v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
+ "STANDBY"))
+ .forEach(v -> activeReplicaCount.getAndIncrement());
+ Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
+
Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName)
&& ev.getStateMap(partition)
+ .get(evacuateInstanceName)
+ .equals("MASTER") && ev.getStateMap(partition)
+ .get(evacuateInstanceName)
+ .equals("LEADER"));
+
+ }
+ }
- _participants.get(3).syncStart();
+ _participants.get(1).syncStart();
_participants.get(2).syncStart();
}