This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 9a36e71c5 Add auto deregistration of offline participants after
timeout (#2932)
9a36e71c5 is described below
commit 9a36e71c5b099b476e4c02372b3e00f49fc3c432
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Thu Feb 6 11:24:21 2025 -0800
Add auto deregistration of offline participants after timeout (#2932)
Add new ParticipantDeregistrationStage to handle removing instances that
have been offline greater than customer set timeout.
This timeout is determined by the value of
PARTICIPANT_DEREGISTRATION_TIMEOUT in the cluster config. If the field is not
set, then no participants will be deregistered.
---
.../helix/controller/GenericHelixController.java | 2 +
.../helix/controller/pipeline/AsyncWorkerType.java | 3 +-
.../stages/ParticipantDeregistrationStage.java | 105 ++++++++
.../java/org/apache/helix/model/ClusterConfig.java | 32 ++-
.../java/org/apache/helix/common/ZkTestBase.java | 24 ++
.../stages/TestParticipantDeregistrationStage.java | 266 +++++++++++++++++++++
...sourceWhenRequireDelayedRebalanceOverwrite.java | 8 +-
.../helix/integration/TestForceKillInstance.java | 38 ++-
8 files changed, 448 insertions(+), 30 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c641b4d71..e63981d29 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -88,6 +88,7 @@ import org.apache.helix.controller.stages.ManagementModeStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ParticipantDeregistrationStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
@@ -534,6 +535,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new TargetExteralViewCalcStage());
+ rebalancePipeline.addStage(new ParticipantDeregistrationStage());
// external view generation
Pipeline externalViewPipeline = new Pipeline(pipelineName);
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index a1afb95f2..ecbe7eb0c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -33,5 +33,6 @@ public enum AsyncWorkerType {
ExternalViewComputeWorker,
MaintenanceRecoveryWorker,
TaskJobPurgeWorker,
- CustomizedStateViewComputeWorker
+ CustomizedStateViewComputeWorker,
+ ParticipantDeregistrationWorker
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
new file mode 100644
index 000000000..a4fcab3e1
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
@@ -0,0 +1,105 @@
+package org.apache.helix.controller.stages;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ParticipantHistory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline;
+
+
+public class ParticipantDeregistrationStage extends AbstractAsyncBaseStage {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParticipantDeregistrationStage.class);
+
+ @Override
+ public AsyncWorkerType getAsyncWorkerType() {
+ return AsyncWorkerType.ParticipantDeregistrationWorker;
+ }
+
+ @Override
+ public void execute(ClusterEvent event) throws Exception {
+ HelixManager manager =
event.getAttribute(AttributeName.helixmanager.name());
+ ClusterConfig clusterConfig =
manager.getConfigAccessor().getClusterConfig(manager.getClusterName());
+ if (clusterConfig == null ||
!clusterConfig.isParticipantDeregistrationEnabled()) {
+ LOG.debug("Cluster config is null or participant deregistration is not
enabled. "
+ + "Skipping participant deregistration.");
+ return;
+ }
+
+ ResourceControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+ Map<String, Long> offlineTimeMap = cache.getInstanceOfflineTimeMap();
+ long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
+ long stageStartTime = System.currentTimeMillis();
+ Set<String> participantsToDeregister = new HashSet<>();
+ long nextDeregisterTime = -1;
+
+
+ for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
+ String instanceName = entry.getKey();
+ Long offlineTime = entry.getValue();
+ long deregisterTime = offlineTime + deregisterDelay;
+
+ // Skip if instance is still online
+ if (cache.getLiveInstances().containsKey(instanceName)) {
+ continue;
+ }
+
+ // If deregister time is in the past, deregister the instance
+ if (deregisterTime <= stageStartTime) {
+ participantsToDeregister.add(instanceName);
+ } else {
+ // Otherwise, find the next earliest deregister time
+ nextDeregisterTime = nextDeregisterTime == -1 ? deregisterTime :
Math.min(nextDeregisterTime, deregisterTime);
+ }
+ }
+
+ deregisterParticipants(manager, cache, participantsToDeregister);
+
+ // Schedule the next deregister task
+ if (nextDeregisterTime != -1) {
+ long delay = Math.max(nextDeregisterTime - System.currentTimeMillis(),
0);
+ scheduleOnDemandPipeline(manager.getClusterName(), delay);
+ }
+ }
+
+ private void deregisterParticipants(HelixManager manager,
ResourceControllerDataProvider cache,
+ Set<String> instancesToDeregister) {
+ Set<String> successfullyDeregisteredInstances = new HashSet<>();
+
+ if (manager == null || !manager.isConnected() || cache == null ||
instancesToDeregister == null) {
+ LOG.warn("ParticipantDeregistrationStage failed due to HelixManager
being null or not connected!");
+ return;
+ }
+
+ if (instancesToDeregister.isEmpty()) {
+ LOG.debug("There are no instances to deregister from cluster {}",
cache.getClusterName());
+ return;
+ }
+
+ // Perform safety checks before deregistering the instances
+ for (String instanceName : instancesToDeregister) {
+ InstanceConfig instanceConfig =
cache.getInstanceConfigMap().get(instanceName);
+ if (instanceConfig == null) {
+ LOG.debug("Instance config is null for instance {}, skip deregistering
the instance", instanceName);
+ continue;
+ }
+
+ try {
+ manager.getClusterManagmentTool().dropInstance(cache.getClusterName(),
instanceConfig);
+ successfullyDeregisteredInstances.add(instanceName);
+ LOG.info("Successfully deregistered instance {} from cluster {}",
instanceName, cache.getClusterName());
+ } catch (HelixException e) {
+ LOG.warn("Failed to deregister instance {} from cluster {}",
instanceName, cache.getClusterName(), e);
+ }
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index edb7a76c6..a77acae35 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -165,7 +165,9 @@ public class ClusterConfig extends HelixProperty {
LAST_ON_DEMAND_REBALANCE_TIMESTAMP,
// List of Preferred scoring keys used in evenness score computation
- PREFERRED_SCORING_KEYS
+ PREFERRED_SCORING_KEYS,
+ // How long offline nodes will stay in the cluster before they are
automatically purged, in milliseconds
+ PARTICIPANT_DEREGISTRATION_TIMEOUT
}
public enum GlobalRebalancePreferenceKey {
@@ -1255,4 +1257,32 @@ public class ClusterConfig extends HelixProperty {
_record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name(),
preferredScoringKeys);
}
+
+ /**
+ * Get the PARTICIPANT_DEREGISTRATION_TIMEOUT for cluster, which determines
how long offline nodes will stay in the
+ * cluster before they are automatically purged. If not set, returns -1.
+ *
+ * @return PARTICIPANT_DEREGISTRATION_TIMEOUT value in milliseconds
+ */
+ public long getParticipantDeregistrationTimeout() {
+ return
_record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
+ -1);
+ }
+
+/**
+ * Set the PARTICIPANT_DEREGISTRATION_TIMEOUT for cluster, which determines
how long offline nodes will stay in the
+ * cluster before they are automatically purged.
+ *
+ * @param timeout PARTICIPANT_DEREGISTRATION_TIMEOUT value in milliseconds
+ */
+ public void setParticipantDeregistrationTimeout(long timeout) {
+
_record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
timeout);
+ }
+
+ /**
+ * @return true if PARTICIPANT_DEREGISTRATION_TIMEOUT is greater than -1,
false otherwise
+ */
+ public boolean isParticipantDeregistrationEnabled() {
+ return getParticipantDeregistrationTimeout() > -1;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index a26560518..278cdfc7a 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -55,6 +55,7 @@ import
org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -830,4 +831,27 @@ public class ZkTestBase {
return _clusterName;
}
}
+
+ public MockParticipantManager addParticipant(String cluster, String
instanceName) {
+ _gSetupTool.addInstanceToCluster(cluster, instanceName);
+ MockParticipantManager toAddParticipant =
+ new MockParticipantManager(ZK_ADDR, cluster, instanceName);
+ toAddParticipant.syncStart();
+ return toAddParticipant;
+ }
+
+ public void dropParticipant(String cluster, MockParticipantManager
participant) {
+ if (participant == null) {
+ return;
+ }
+
+ try {
+ participant.syncStop();
+ InstanceConfig instanceConfig =
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster,
participant.getInstanceName());
+ _gSetupTool.getClusterManagementTool().dropInstance(cluster,
instanceConfig);
+ } catch (Exception e) {
+ LOG.warn("Error dropping participant " + participant.getInstanceName(),
e);
+ }
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
new file mode 100644
index 000000000..7e9bfabca
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
@@ -0,0 +1,266 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantDeregistrationStage extends ZkTestBase {
+ final static long DEREGISTER_TIMEOUT = 5000;
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ private static final int NUM_NODES = 5;
+ private List<MockParticipantManager> _participants = new ArrayList<>();
+ private HelixAdmin _admin;
+ private HelixDataAccessor _dataAccessor;
+ private ClusterControllerManager _controller;
+ private ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() {
+ System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODES; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + i;
+ addParticipant(CLUSTER_NAME, instanceName);
+ }
+
+ _configAccessor = new ConfigAccessor(_gZkClient);
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ _admin = _gSetupTool.getClusterManagementTool();
+ _dataAccessor = _controller.getHelixDataAccessor();
+
+ setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+ }
+
+ // Asserts that a node will be removed from the cluster after it exceedsthe
deregister timeout set in the cluster config
+ @Test
+ public void testParticipantAutoLeavesAfterOfflineTimeout() throws Exception {
+ System.out.println("START " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager participantToDeregister = _participants.get(0);
+ participantToDeregister.syncStop();
+ boolean result = TestHelper.verify(() ->
!_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()),
TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participant should have been deregistered");
+
+ dropParticipant(CLUSTER_NAME, participantToDeregister);
+ addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+ System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // Asserts that will not be removed from the cluster if it comes back online
before the deregister timeout
+ // and that the deregister timeout is reset, so the node will not be removed
until time
+ // of last offline + deregister timeout
+ @Test (dependsOnMethods = "testParticipantAutoLeavesAfterOfflineTimeout")
+ public void testReconnectedParticipantNotDeregisteredWhenLive() throws
Exception {
+ System.out.println("START " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager participantToDeregister = _participants.get(0);
+ // Kill instance so deregister is scheduled
+ LiveInstance liveInstance = _dataAccessor.getProperty(
+
_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+ participantToDeregister.syncStop();
+
+ // Sleep for half the deregister timeout
+ Thread.sleep(DEREGISTER_TIMEOUT * 3/5);
+
+ // Manually recreate live instance so controller thinks it's back online
+ // This should prevent the node from being deregistered
+
_dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()),
+ liveInstance);
+
+ // assert that the instance is still in the cluster
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() < startTime + DEREGISTER_TIMEOUT) {
+ Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()), "Participant
should not have been deregistered");
+ }
+
+ // Re kill and assert that the instance is deregistered
+
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+ boolean result = TestHelper.verify(() ->
!_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()),
TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participants should have been deregistered.
Participants to deregister: "
+ + participantToDeregister + " Remaining participants: in cluster " +
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+ dropParticipant(CLUSTER_NAME, participantToDeregister);
+ addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+ System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // Same assertions as above but this time the node is re-killed immediately
after being added back
+ @Test (dependsOnMethods =
"testReconnectedParticipantNotDeregisteredWhenLive")
+ public void testFlappingParticipantIsNotDeregistered() throws Exception {
+ System.out.println("START " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager participantToDeregister = _participants.get(0);
+ // Kill instance so deregister is scheduled
+ LiveInstance liveInstance = _dataAccessor.getProperty(
+
_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+ participantToDeregister.syncStop();
+
+ // Sleep for more than half the deregister timeout
+ Thread.sleep(DEREGISTER_TIMEOUT * 3/5);
+
+ // Manually recreate live instance so controller thinks it's back online,
then immediately delete
+
_dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()),
+ liveInstance);
+ ParticipantHistory participantHistory =
_dataAccessor.getProperty(_dataAccessor.keyBuilder()
+ .participantHistory(participantToDeregister.getInstanceName()));
+ participantHistory.reportOnline("foo", "bar");
+
_dataAccessor.setProperty(_dataAccessor.keyBuilder().participantHistory(participantToDeregister.getInstanceName()),
+ participantHistory);
+
+
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+
+ // assert that the instance is still in the cluster after original
deregistration time should have passed
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() < startTime + (DEREGISTER_TIMEOUT *
3/5)) {
+ Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()), "Participant
should not have been deregistered");
+ }
+
+ // Re kill and assert that the instance is deregistered
+
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+ boolean result = TestHelper.verify(() ->
!_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()),
TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participants should have been deregistered.
Participants to deregister: "
+ + participantToDeregister + " Remaining participants: in cluster " +
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+ dropParticipant(CLUSTER_NAME, participantToDeregister);
+ addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+ System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // Tests enabling deregister will trigger deregister for participants that
were already offline
+ @Test (dependsOnMethods = "testFlappingParticipantIsNotDeregistered")
+ public void testDeregisterAfterConfigEnabled() throws Exception {
+ System.out.println("START " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // Set to deregister to disabled
+ long testDeregisterTimeout = 1000;
+ setAutoDeregisterConfigs(CLUSTER_NAME, -1);
+
+ // Create and immediately kill participants
+ List<MockParticipantManager> killedParticipants = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME,
"participants_to_kill_" + i);
+ participantToKill.syncStop();
+ killedParticipants.add(participantToKill);
+ }
+
+ // Sleep so that participant offline time exceeds deregister timeout
+ Thread.sleep(testDeregisterTimeout);
+ // Trigger on disable --> enable deregister
+ setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+ // Assert participants have been deregistered
+ boolean result = TestHelper.verify(() -> {
+ List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
+ return killedParticipants.stream().noneMatch(participant ->
instances.contains(participant.getInstanceName()));
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participants should have been deregistered.
Participants to deregister: "
+ + killedParticipants + " Remaining participants: in cluster " +
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+ // reset cluster state
+ killedParticipants.forEach(participant -> {
+ dropParticipant(CLUSTER_NAME, participant);
+ });
+ System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // Tests shortening deregister timeout will trigger deregister and also
deregister participants that now exceed
+ // the new (shorter) timeout
+ @Test (dependsOnMethods = "testDeregisterAfterConfigEnabled")
+ public void testDeregisterAfterConfigTimeoutShortened() throws Exception {
+ System.out.println("START " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ long longDeregisterTimeout = 1000*60*60*24;
+ long shortDeregisterTimeout = 1000;
+ setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+ // Create and immediately kill participants
+ List<MockParticipantManager> killedParticipants = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME,
"participants_to_kill_" + i);
+ participantToKill.syncStop();
+ killedParticipants.add(participantToKill);
+ }
+
+ // Sleep so that participant offline time exceeds deregister timeout
+ Thread.sleep(shortDeregisterTimeout);
+
+ // Trigger on shorten deregister timeout
+ setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+ // Assert participants have been deregistered
+ boolean result = TestHelper.verify(() -> {
+ List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
+ return killedParticipants.stream().noneMatch(participant ->
instances.contains(participant.getInstanceName()));
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participants should have been deregistered.
Participants to deregister: "
+ + killedParticipants + " Remaining participants: in cluster " +
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+ // reset cluster state
+ killedParticipants.forEach(participant -> {
+ dropParticipant(CLUSTER_NAME, participant);
+ });
+ System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Override
+ public void dropParticipant(String clusterName, MockParticipantManager
participant) {
+ _participants.remove(participant);
+ super.dropParticipant(clusterName, participant);
+ }
+
+ @Override
+ public MockParticipantManager addParticipant(String clusterName, String
instanceName) {
+ MockParticipantManager toAddParticipant =
super.addParticipant(clusterName, instanceName);
+ _participants.add(toAddParticipant);
+ return toAddParticipant;
+ }
+
+ private void setAutoDeregisterConfigs(String clusterName, long timeout) {
+ ClusterConfig clusterConfig =
_configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setParticipantDeregistrationTimeout(timeout);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ // Allow participant to ensure compatibility with nodes re-joining when
they re-establish connection
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ CLUSTER_NAME).build();
+ _configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN,
"true");
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
index 7c93c33b6..c2b57f390 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
@@ -110,12 +110,10 @@ public class
TestAddResourceWhenRequireDelayedRebalanceOverwrite extends ZkTestB
+ new Date(System.currentTimeMillis()));
}
- private MockParticipantManager addParticipant(String cluster, String
instanceName) {
- _gSetupTool.addInstanceToCluster(cluster, instanceName);
- MockParticipantManager toAddParticipant =
- new MockParticipantManager(ZK_ADDR, cluster, instanceName);
+ @Override
+ public MockParticipantManager addParticipant(String clusterName, String
instanceName) {
+ MockParticipantManager toAddParticipant =
super.addParticipant(clusterName, instanceName);
_participants.add(toAddParticipant);
- toAddParticipant.syncStart();
return toAddParticipant;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
index d7dc44205..c5946a331 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
@@ -132,7 +132,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -166,7 +166,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -203,7 +203,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -236,7 +236,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -271,7 +271,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should have assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -309,7 +309,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -341,7 +341,7 @@ public class TestForceKillInstance extends ZkTestBase {
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(instanceToKillName));
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -375,7 +375,7 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
@@ -411,13 +411,14 @@ public class TestForceKillInstance extends ZkTestBase {
"Instance should not have any assignments");
// Reset state of cluster
- dropParticipant(CLUSTER_NAME, instanceToKillName);
+ dropParticipant(CLUSTER_NAME, instanceToKill);
addParticipant(CLUSTER_NAME, instanceToKillName);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
}
- private MockParticipantManager addParticipant(String cluster, String
instanceName) {
+ @Override
+ public MockParticipantManager addParticipant(String cluster, String
instanceName) {
_gSetupTool.addInstanceToCluster(cluster, instanceName);
MockParticipantManager toAddParticipant =
new MockParticipantManager(ZK_ADDR, cluster, instanceName);
@@ -429,19 +430,10 @@ public class TestForceKillInstance extends ZkTestBase {
return toAddParticipant;
}
- protected void dropParticipant(String cluster, String instanceName) {
- // find mock participant manager with instanceName and remove it from
_mockParticipantManagers.
- MockParticipantManager toRemoveManager = _participants.stream()
- .filter(manager -> manager.getInstanceName().equals(instanceName))
- .findFirst()
- .orElse(null);
- if (toRemoveManager != null) {
- toRemoveManager.syncStop();
- _participants.remove(toRemoveManager);
- }
-
- InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, instanceName);
- _gSetupTool.getClusterManagementTool().dropInstance(cluster,
instanceConfig);
+ @Override
+ public void dropParticipant(String cluster, MockParticipantManager
participant) {
+ _participants.remove(participant);
+ super.dropParticipant(cluster, participant);
}
private Map<String, ExternalView> getEVs() {