This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new aa592c6c5 Clear removed instances from the cached offline time map 
(#3012)
aa592c6c5 is described below

commit aa592c6c5ea79df18865fcf85530036f6c7cb3a4
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Apr 2 13:34:01 2025 -0700

    Clear removed instances from the cached offline time map (#3012)
    
    Clear removed instances from the cached offline time map. This fixes a bug 
where stale entries in the map caused participants to be incorrectly 
deregistered after leaving and re-joining a cluster leveraging the participant 
auto deregistration feature.
---
 .../dataproviders/BaseControllerDataProvider.java  |  3 ++
 .../stages/ParticipantDeregistrationStage.java     |  2 -
 .../stages/TestParticipantDeregistrationStage.java | 53 +++++++++++++++++++++-
 3 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 997f0f8aa..1b4c3b026 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -1049,6 +1049,9 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
   private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
     if (!_updateInstanceOfflineTime) {
+      // Clean up entries for nodes that have been removed from the cluster. 
This prevents a stale offline time from
+      // being used when the node is re-added to the cluster but before it 
updates its offline time.
+      
_instanceOfflineTimeMap.keySet().retainAll(_allInstanceConfigCache.getPropertyMap().keySet());
       return;
     }
     List<String> offlineNodes = new 
ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
index a4fcab3e1..ef950a185 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
@@ -10,8 +10,6 @@ import 
org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.ParticipantHistory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline;
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
index 7e9bfabca..0fad7e2d1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
@@ -8,14 +8,18 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -31,6 +35,7 @@ public class TestParticipantDeregistrationStage extends 
ZkTestBase {
   private HelixDataAccessor _dataAccessor;
   private ClusterControllerManager _controller;
   private ConfigAccessor _configAccessor;
+  private BestPossibleExternalViewVerifier _verifier;
 
   @BeforeClass
   public void beforeClass() {
@@ -53,6 +58,9 @@ public class TestParticipantDeregistrationStage extends 
ZkTestBase {
     _dataAccessor = _controller.getHelixDataAccessor();
 
     setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+    _verifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
   }
 
   // Asserts that a node will be removed from the cluster after it exceedsthe 
deregister timeout set in the cluster config
@@ -208,7 +216,7 @@ public class TestParticipantDeregistrationStage extends 
ZkTestBase {
         + new Date(System.currentTimeMillis()));
     long longDeregisterTimeout = 1000*60*60*24;
     long shortDeregisterTimeout = 1000;
-    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+    setAutoDeregisterConfigs(CLUSTER_NAME, longDeregisterTimeout);
 
     // Create and immediately kill participants
     List<MockParticipantManager> killedParticipants = new ArrayList<>();
@@ -222,7 +230,7 @@ public class TestParticipantDeregistrationStage extends 
ZkTestBase {
     Thread.sleep(shortDeregisterTimeout);
 
     // Trigger on shorten deregister timeout
-    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+    setAutoDeregisterConfigs(CLUSTER_NAME, shortDeregisterTimeout);
 
     // Assert participants have been deregistered
     boolean result = TestHelper.verify(() -> {
@@ -236,10 +244,51 @@ public class TestParticipantDeregistrationStage extends 
ZkTestBase {
     killedParticipants.forEach(participant -> {
       dropParticipant(CLUSTER_NAME, participant);
     });
+    setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
     System.out.println("END " + TestHelper.getTestClassName() + "." + 
TestHelper.getTestMethodName() + " at "
         + new Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testParticipantDeregisteredAndRejoins() throws Exception {
+    String firstDB = "firstDB";
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, 3, "LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO.name(), null);
+    IdealState idealStateOne =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
firstDB);
+    idealStateOne.setMinActiveReplicas(2);
+    idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
firstDB, idealStateOne);
+    // Set replica count to be # instances in cluster + 1, so that we can 
ensure that each participant assigned a replica
+    int replicaCount = 
_gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME).size();
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, replicaCount);
+
+    // Wait for cluster to converge
+    Assert.assertTrue(_verifier.verifyByPolling());
+
+    MockParticipantManager participantToDeregister = _participants.get(0);
+    participantToDeregister.syncStop();
+    boolean result = TestHelper.verify(() -> 
!_admin.getInstancesInCluster(CLUSTER_NAME)
+        .contains(participantToDeregister.getInstanceName()), 
TestHelper.WAIT_DURATION);
+    Assert.assertTrue(result, "Participant should have been deregistered");
+
+    addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+
+    // Wait for cluster to converge
+    Assert.assertTrue(_verifier.verifyByPolling());
+
+    // loop through partitions and ensure that the participant is assigned to 
at least one partition
+    ExternalView ev = 
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
firstDB);
+    boolean assigned = false;
+    for (String partition : ev.getPartitionSet()) {
+      if 
(ev.getStateMap(partition).containsKey(participantToDeregister.getInstanceName()))
 {
+        assigned = true;
+        break;
+      }
+    }
+    Assert.assertTrue(assigned, "Participant should have been assigned to at 
least one partition");
+  }
+
   @Override
   public void dropParticipant(String clusterName, MockParticipantManager 
participant) {
     _participants.remove(participant);

Reply via email to