This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a36e71c5 Add auto deregistration of offline participants after 
timeout (#2932)
9a36e71c5 is described below

commit 9a36e71c5b099b476e4c02372b3e00f49fc3c432
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Thu Feb 6 11:24:21 2025 -0800

    Add auto deregistration of offline participants after timeout (#2932)
    
    Add new ParticipantDeregistrationStage to handle removing instances that 
have been offline greater than customer set timeout.
    This timeout is determined by the value of 
PARTICIPANT_DEREGISTRATION_TIMEOUT in the cluster config. If the field is not 
set, then no participants will be deregistered.
---
 .../helix/controller/GenericHelixController.java   |   2 +
 .../helix/controller/pipeline/AsyncWorkerType.java |   3 +-
 .../stages/ParticipantDeregistrationStage.java     | 105 ++++++++
 .../java/org/apache/helix/model/ClusterConfig.java |  32 ++-
 .../java/org/apache/helix/common/ZkTestBase.java   |  24 ++
 .../stages/TestParticipantDeregistrationStage.java | 266 +++++++++++++++++++++
 ...sourceWhenRequireDelayedRebalanceOverwrite.java |   8 +-
 .../helix/integration/TestForceKillInstance.java   |  38 ++-
 8 files changed, 448 insertions(+), 30 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index c641b4d71..e63981d29 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -88,6 +88,7 @@ import org.apache.helix.controller.stages.ManagementModeStage;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ParticipantDeregistrationStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
@@ -534,6 +535,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       rebalancePipeline.addStage(new ResourceMessageDispatchStage());
       rebalancePipeline.addStage(new PersistAssignmentStage());
       rebalancePipeline.addStage(new TargetExteralViewCalcStage());
+      rebalancePipeline.addStage(new ParticipantDeregistrationStage());
 
       // external view generation
       Pipeline externalViewPipeline = new Pipeline(pipelineName);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index a1afb95f2..ecbe7eb0c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -33,5 +33,6 @@ public enum AsyncWorkerType {
   ExternalViewComputeWorker,
   MaintenanceRecoveryWorker,
   TaskJobPurgeWorker,
-  CustomizedStateViewComputeWorker
+  CustomizedStateViewComputeWorker,
+  ParticipantDeregistrationWorker
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
new file mode 100644
index 000000000..a4fcab3e1
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
@@ -0,0 +1,105 @@
+package org.apache.helix.controller.stages;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ParticipantHistory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline;
+
+
+public class ParticipantDeregistrationStage extends AbstractAsyncBaseStage {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParticipantDeregistrationStage.class);
+
+  @Override
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.ParticipantDeregistrationWorker;
+  }
+
+  @Override
+  public void execute(ClusterEvent event) throws Exception {
+    HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
+    ClusterConfig clusterConfig = 
manager.getConfigAccessor().getClusterConfig(manager.getClusterName());
+    if (clusterConfig == null || 
!clusterConfig.isParticipantDeregistrationEnabled()) {
+      LOG.debug("Cluster config is null or participant deregistration is not 
enabled. "
+          + "Skipping participant deregistration.");
+      return;
+    }
+
+    ResourceControllerDataProvider cache = 
event.getAttribute(AttributeName.ControllerDataProvider.name());
+    Map<String, Long> offlineTimeMap = cache.getInstanceOfflineTimeMap();
+    long deregisterDelay = clusterConfig.getParticipantDeregistrationTimeout();
+    long stageStartTime = System.currentTimeMillis();
+    Set<String> participantsToDeregister = new HashSet<>();
+    long nextDeregisterTime = -1;
+
+
+    for (Map.Entry<String, Long> entry : offlineTimeMap.entrySet()) {
+      String instanceName = entry.getKey();
+      Long offlineTime = entry.getValue();
+      long deregisterTime = offlineTime + deregisterDelay;
+
+      // Skip if instance is still online
+      if (cache.getLiveInstances().containsKey(instanceName)) {
+        continue;
+      }
+
+      // If deregister time is in the past, deregister the instance
+      if (deregisterTime <= stageStartTime) {
+        participantsToDeregister.add(instanceName);
+      } else {
+        // Otherwise, find the next earliest deregister time
+        nextDeregisterTime = nextDeregisterTime == -1 ? deregisterTime : 
Math.min(nextDeregisterTime, deregisterTime);
+      }
+    }
+
+    deregisterParticipants(manager, cache, participantsToDeregister);
+
+    // Schedule the next deregister task
+    if (nextDeregisterTime != -1) {
+      long delay = Math.max(nextDeregisterTime - System.currentTimeMillis(), 
0);
+      scheduleOnDemandPipeline(manager.getClusterName(), delay);
+    }
+  }
+
+  private void deregisterParticipants(HelixManager manager, 
ResourceControllerDataProvider cache,
+      Set<String> instancesToDeregister) {
+    Set<String> successfullyDeregisteredInstances = new HashSet<>();
+
+    if (manager == null || !manager.isConnected() || cache == null || 
instancesToDeregister == null) {
+      LOG.warn("ParticipantDeregistrationStage failed due to HelixManager 
being null or not connected!");
+      return;
+    }
+
+    if (instancesToDeregister.isEmpty()) {
+      LOG.debug("There are no instances to deregister from cluster {}", 
cache.getClusterName());
+      return;
+    }
+
+    // Perform safety checks before deregistering the instances
+    for (String instanceName : instancesToDeregister) {
+      InstanceConfig instanceConfig = 
cache.getInstanceConfigMap().get(instanceName);
+      if (instanceConfig == null) {
+        LOG.debug("Instance config is null for instance {}, skip deregistering 
the instance", instanceName);
+        continue;
+      }
+
+      try {
+        manager.getClusterManagmentTool().dropInstance(cache.getClusterName(), 
instanceConfig);
+        successfullyDeregisteredInstances.add(instanceName);
+        LOG.info("Successfully deregistered instance {} from cluster {}", 
instanceName, cache.getClusterName());
+      } catch (HelixException e) {
+        LOG.warn("Failed to deregister instance {} from cluster {}", 
instanceName, cache.getClusterName(), e);
+      }
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index edb7a76c6..a77acae35 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -165,7 +165,9 @@ public class ClusterConfig extends HelixProperty {
     LAST_ON_DEMAND_REBALANCE_TIMESTAMP,
 
     // List of Preferred scoring keys used in evenness score computation
-    PREFERRED_SCORING_KEYS
+    PREFERRED_SCORING_KEYS,
+    // How long offline nodes will stay in the cluster before they are 
automatically purged, in milliseconds
+    PARTICIPANT_DEREGISTRATION_TIMEOUT
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -1255,4 +1257,32 @@ public class ClusterConfig extends HelixProperty {
     _record.setListField(ClusterConfigProperty.PREFERRED_SCORING_KEYS.name(),
         preferredScoringKeys);
   }
+
+  /**
+   * Get the PARTICIPANT_DEREGISTRATION_TIMEOUT for cluster, which determines 
how long offline nodes will stay in the
+   * cluster before they are automatically purged. If not set, returns -1.
+   *
+   * @return PARTICIPANT_DEREGISTRATION_TIMEOUT value in milliseconds
+   */
+  public long getParticipantDeregistrationTimeout() {
+    return 
_record.getLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
+        -1);
+  }
+
+/**
+ * Set the PARTICIPANT_DEREGISTRATION_TIMEOUT for cluster, which determines 
how long offline nodes will stay in the
+ * cluster before they are automatically purged.
+ *
+ * @param timeout PARTICIPANT_DEREGISTRATION_TIMEOUT value in milliseconds
+ */
+  public void setParticipantDeregistrationTimeout(long timeout) {
+    
_record.setLongField(ClusterConfigProperty.PARTICIPANT_DEREGISTRATION_TIMEOUT.name(),
 timeout);
+  }
+
+  /**
+   * @return true if PARTICIPANT_DEREGISTRATION_TIMEOUT is greater than -1, 
false otherwise
+   */
+  public boolean isParticipantDeregistrationEnabled() {
+    return getParticipantDeregistrationTimeout() > -1;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index a26560518..278cdfc7a 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -55,6 +55,7 @@ import 
org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -830,4 +831,27 @@ public class ZkTestBase {
       return _clusterName;
     }
   }
+
+  public MockParticipantManager addParticipant(String cluster, String 
instanceName) {
+    _gSetupTool.addInstanceToCluster(cluster, instanceName);
+    MockParticipantManager toAddParticipant =
+        new MockParticipantManager(ZK_ADDR, cluster, instanceName);
+    toAddParticipant.syncStart();
+    return toAddParticipant;
+  }
+
+  public void dropParticipant(String cluster, MockParticipantManager 
participant) {
+    if (participant == null) {
+      return;
+    }
+
+    try {
+      participant.syncStop();
+      InstanceConfig instanceConfig =
+          _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, 
participant.getInstanceName());
+      _gSetupTool.getClusterManagementTool().dropInstance(cluster, 
instanceConfig);
+    } catch (Exception e) {
+      LOG.warn("Error dropping participant " + participant.getInstanceName(), 
e);
+    }
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
new file mode 100644
index 000000000..7e9bfabca
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
@@ -0,0 +1,266 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestParticipantDeregistrationStage extends ZkTestBase {
+  final static long DEREGISTER_TIMEOUT = 5000;
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private static final int NUM_NODES = 5;
+  private List<MockParticipantManager> _participants = new ArrayList<>();
+  private HelixAdmin _admin;
+  private HelixDataAccessor _dataAccessor;
+  private ClusterControllerManager _controller;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + i;
+      addParticipant(CLUSTER_NAME, instanceName);
+    }
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _admin = _gSetupTool.getClusterManagementTool();
+    _dataAccessor = _controller.getHelixDataAccessor();
+
+    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+  }
+
+  // Asserts that a node will be removed from the cluster after it exceedsthe 
deregister timeout set in the cluster config
+  @Test
+  public void testParticipantAutoLeavesAfterOfflineTimeout() throws Exception {
+    System.out.println("START " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager participantToDeregister = _participants.get(0);
+    participantToDeregister.syncStop();
+    boolean result = TestHelper.verify(() -> 
!_admin.getInstancesInCluster(CLUSTER_NAME)
+        .contains(participantToDeregister.getInstanceName()), 
TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participant should have been deregistered");
+
+    dropParticipant(CLUSTER_NAME, participantToDeregister);
+    addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+    System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  // Asserts that will not be removed from the cluster if it comes back online 
before the deregister timeout
+  // and that the deregister timeout is reset, so the node will not be removed 
until time
+  // of last offline + deregister timeout
+  @Test (dependsOnMethods = "testParticipantAutoLeavesAfterOfflineTimeout")
+  public void testReconnectedParticipantNotDeregisteredWhenLive() throws 
Exception {
+    System.out.println("START " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager participantToDeregister = _participants.get(0);
+    // Kill instance so deregister is scheduled
+    LiveInstance liveInstance = _dataAccessor.getProperty(
+        
_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+    participantToDeregister.syncStop();
+
+    // Sleep for half the deregister timeout
+    Thread.sleep(DEREGISTER_TIMEOUT * 3/5);
+
+    // Manually recreate live instance so controller thinks it's back online
+    // This should prevent the node from being deregistered
+    
_dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()),
+        liveInstance);
+
+    // assert that the instance is still in the cluster
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() < startTime + DEREGISTER_TIMEOUT) {
+      Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME)
+          .contains(participantToDeregister.getInstanceName()), "Participant 
should not have been deregistered");
+    }
+
+    // Re kill and assert that the instance is deregistered
+    
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+    boolean result = TestHelper.verify(() -> 
!_admin.getInstancesInCluster(CLUSTER_NAME)
+        .contains(participantToDeregister.getInstanceName()), 
TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participants should have been deregistered. 
Participants to deregister: "
+        + participantToDeregister + " Remaining participants: in cluster " + 
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+    dropParticipant(CLUSTER_NAME, participantToDeregister);
+    addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+    System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  // Same assertions as above but this time the node is re-killed immediately 
after being added back
+  @Test (dependsOnMethods = 
"testReconnectedParticipantNotDeregisteredWhenLive")
+  public void testFlappingParticipantIsNotDeregistered() throws Exception {
+    System.out.println("START " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager participantToDeregister = _participants.get(0);
+    // Kill instance so deregister is scheduled
+    LiveInstance liveInstance = _dataAccessor.getProperty(
+        
_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+    participantToDeregister.syncStop();
+
+    // Sleep for more than half the deregister timeout
+    Thread.sleep(DEREGISTER_TIMEOUT * 3/5);
+
+    // Manually recreate live instance so controller thinks it's back online, 
then immediately delete
+    
_dataAccessor.setProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()),
+        liveInstance);
+    ParticipantHistory participantHistory = 
_dataAccessor.getProperty(_dataAccessor.keyBuilder()
+        .participantHistory(participantToDeregister.getInstanceName()));
+    participantHistory.reportOnline("foo", "bar");
+    
_dataAccessor.setProperty(_dataAccessor.keyBuilder().participantHistory(participantToDeregister.getInstanceName()),
+        participantHistory);
+
+    
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+
+    // assert that the instance is still in the cluster after original 
deregistration time should have passed
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() < startTime + (DEREGISTER_TIMEOUT * 
3/5)) {
+      Assert.assertTrue(_admin.getInstancesInCluster(CLUSTER_NAME)
+          .contains(participantToDeregister.getInstanceName()), "Participant 
should not have been deregistered");
+    }
+
+    // Re kill and assert that the instance is deregistered
+    
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(participantToDeregister.getInstanceName()));
+    boolean result = TestHelper.verify(() -> 
!_admin.getInstancesInCluster(CLUSTER_NAME)
+        .contains(participantToDeregister.getInstanceName()), 
TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participants should have been deregistered. 
Participants to deregister: "
+        + participantToDeregister + " Remaining participants: in cluster " + 
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+    dropParticipant(CLUSTER_NAME, participantToDeregister);
+    addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+    System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  // Tests enabling deregister will trigger deregister for participants that 
were already offline
+  @Test (dependsOnMethods = "testFlappingParticipantIsNotDeregistered")
+  public void testDeregisterAfterConfigEnabled() throws Exception {
+    System.out.println("START " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+
+    // Set to deregister to disabled
+    long testDeregisterTimeout = 1000;
+    setAutoDeregisterConfigs(CLUSTER_NAME, -1);
+
+    // Create and immediately kill participants
+    List<MockParticipantManager> killedParticipants = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, 
"participants_to_kill_" + i);
+      participantToKill.syncStop();
+      killedParticipants.add(participantToKill);
+    }
+
+    // Sleep so that participant offline time exceeds deregister timeout
+    Thread.sleep(testDeregisterTimeout);
+    // Trigger on disable --> enable deregister
+    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+    // Assert participants have been deregistered
+    boolean result = TestHelper.verify(() -> {
+    List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
+      return killedParticipants.stream().noneMatch(participant -> 
instances.contains(participant.getInstanceName()));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participants should have been deregistered. 
Participants to deregister: "
+        + killedParticipants + " Remaining participants: in cluster " + 
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+    // reset cluster state
+    killedParticipants.forEach(participant -> {
+      dropParticipant(CLUSTER_NAME, participant);
+    });
+    System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  // Tests shortening deregister timeout will trigger deregister and also 
deregister participants that now exceed
+  // the new (shorter) timeout
+  @Test (dependsOnMethods = "testDeregisterAfterConfigEnabled")
+  public void testDeregisterAfterConfigTimeoutShortened() throws Exception {
+    System.out.println("START " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+    long longDeregisterTimeout = 1000*60*60*24;
+    long shortDeregisterTimeout = 1000;
+    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+    // Create and immediately kill participants
+    List<MockParticipantManager> killedParticipants = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      MockParticipantManager participantToKill = addParticipant(CLUSTER_NAME, 
"participants_to_kill_" + i);
+      participantToKill.syncStop();
+      killedParticipants.add(participantToKill);
+    }
+
+    // Sleep so that participant offline time exceeds deregister timeout
+    Thread.sleep(shortDeregisterTimeout);
+
+    // Trigger on shorten deregister timeout
+    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+    // Assert participants have been deregistered
+    boolean result = TestHelper.verify(() -> {
+      List<String> instances = _admin.getInstancesInCluster(CLUSTER_NAME);
+      return killedParticipants.stream().noneMatch(participant -> 
instances.contains(participant.getInstanceName()));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participants should have been deregistered. 
Participants to deregister: "
+        + killedParticipants + " Remaining participants: in cluster " + 
_admin.getInstancesInCluster(CLUSTER_NAME));
+
+    // reset cluster state
+    killedParticipants.forEach(participant -> {
+      dropParticipant(CLUSTER_NAME, participant);
+    });
+    System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+
+  @Override
+  public void dropParticipant(String clusterName, MockParticipantManager 
participant) {
+    _participants.remove(participant);
+    super.dropParticipant(clusterName, participant);
+  }
+
+  @Override
+  public MockParticipantManager addParticipant(String clusterName, String 
instanceName) {
+    MockParticipantManager toAddParticipant = 
super.addParticipant(clusterName, instanceName);
+    _participants.add(toAddParticipant);
+    return toAddParticipant;
+  }
+
+  private void setAutoDeregisterConfigs(String clusterName, long timeout) {
+    ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setParticipantDeregistrationTimeout(timeout);
+    _configAccessor.setClusterConfig(clusterName, clusterConfig);
+    // Allow participant to ensure compatibility with nodes re-joining when 
they re-establish connection
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+            CLUSTER_NAME).build();
+    _configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
"true");
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
index 7c93c33b6..c2b57f390 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestAddResourceWhenRequireDelayedRebalanceOverwrite.java
@@ -110,12 +110,10 @@ public class 
TestAddResourceWhenRequireDelayedRebalanceOverwrite extends ZkTestB
         + new Date(System.currentTimeMillis()));
   }
 
-  private MockParticipantManager addParticipant(String cluster, String 
instanceName) {
-    _gSetupTool.addInstanceToCluster(cluster, instanceName);
-    MockParticipantManager toAddParticipant =
-        new MockParticipantManager(ZK_ADDR, cluster, instanceName);
+  @Override
+  public MockParticipantManager addParticipant(String clusterName, String 
instanceName) {
+    MockParticipantManager toAddParticipant = 
super.addParticipant(clusterName, instanceName);
     _participants.add(toAddParticipant);
-    toAddParticipant.syncStart();
     return toAddParticipant;
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
index d7dc44205..c5946a331 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestForceKillInstance.java
@@ -132,7 +132,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -166,7 +166,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -203,7 +203,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -236,7 +236,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -271,7 +271,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should have assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -309,7 +309,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -341,7 +341,7 @@ public class TestForceKillInstance extends ZkTestBase {
     
_dataAccessor.removeProperty(_dataAccessor.keyBuilder().liveInstance(instanceToKillName));
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -375,7 +375,7 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
@@ -411,13 +411,14 @@ public class TestForceKillInstance extends ZkTestBase {
         "Instance should not have any assignments");
 
     // Reset state of cluster
-    dropParticipant(CLUSTER_NAME, instanceToKillName);
+    dropParticipant(CLUSTER_NAME, instanceToKill);
     addParticipant(CLUSTER_NAME, instanceToKillName);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
   }
 
-    private MockParticipantManager addParticipant(String cluster, String 
instanceName) {
+    @Override
+    public MockParticipantManager addParticipant(String cluster, String 
instanceName) {
       _gSetupTool.addInstanceToCluster(cluster, instanceName);
       MockParticipantManager toAddParticipant =
           new MockParticipantManager(ZK_ADDR, cluster, instanceName);
@@ -429,19 +430,10 @@ public class TestForceKillInstance extends ZkTestBase {
       return toAddParticipant;
     }
 
-    protected void dropParticipant(String cluster, String instanceName) {
-      // find mock participant manager with instanceName and remove it from 
_mockParticipantManagers.
-      MockParticipantManager toRemoveManager = _participants.stream()
-          .filter(manager -> manager.getInstanceName().equals(instanceName))
-          .findFirst()
-          .orElse(null);
-      if (toRemoveManager != null) {
-        toRemoveManager.syncStop();
-        _participants.remove(toRemoveManager);
-      }
-
-      InstanceConfig instanceConfig = 
_gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, instanceName);
-      _gSetupTool.getClusterManagementTool().dropInstance(cluster, 
instanceConfig);
+  @Override
+  public void dropParticipant(String cluster, MockParticipantManager 
participant) {
+    _participants.remove(participant);
+    super.dropParticipant(cluster, participant);
     }
 
   private Map<String, ExternalView> getEVs() {

Reply via email to