This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new aa592c6c5 Clear removed instances from the cached offline time map
(#3012)
aa592c6c5 is described below
commit aa592c6c5ea79df18865fcf85530036f6c7cb3a4
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Apr 2 13:34:01 2025 -0700
Clear removed instances from the cached offline time map (#3012)
Clear removed instances from the cached offline time map. This fixes a bug
where stale entries in the map caused participants to be incorrectly
deregistered after leaving and re-joining a cluster leveraging the participant
auto deregistration feature.
---
.../dataproviders/BaseControllerDataProvider.java | 3 ++
.../stages/ParticipantDeregistrationStage.java | 2 -
.../stages/TestParticipantDeregistrationStage.java | 53 +++++++++++++++++++++-
3 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 997f0f8aa..1b4c3b026 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -1049,6 +1049,9 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
if (!_updateInstanceOfflineTime) {
+ // Clean up entries for nodes that have been removed from the cluster.
This prevents a stale offline time from
+ // being used when the node is re-added to the cluster but before it
updates its offline time.
+
_instanceOfflineTimeMap.keySet().retainAll(_allInstanceConfigCache.getPropertyMap().keySet());
return;
}
List<String> offlineNodes = new
ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
index a4fcab3e1..ef950a185 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/ParticipantDeregistrationStage.java
@@ -10,8 +10,6 @@ import
org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.ParticipantHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.util.RebalanceUtil.scheduleOnDemandPipeline;
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
index 7e9bfabca..0fad7e2d1 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestParticipantDeregistrationStage.java
@@ -8,14 +8,18 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -31,6 +35,7 @@ public class TestParticipantDeregistrationStage extends
ZkTestBase {
private HelixDataAccessor _dataAccessor;
private ClusterControllerManager _controller;
private ConfigAccessor _configAccessor;
+ private BestPossibleExternalViewVerifier _verifier;
@BeforeClass
public void beforeClass() {
@@ -53,6 +58,9 @@ public class TestParticipantDeregistrationStage extends
ZkTestBase {
_dataAccessor = _controller.getHelixDataAccessor();
setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+
+ _verifier = new
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
}
// Asserts that a node will be removed from the cluster after it exceedsthe
deregister timeout set in the cluster config
@@ -208,7 +216,7 @@ public class TestParticipantDeregistrationStage extends
ZkTestBase {
+ new Date(System.currentTimeMillis()));
long longDeregisterTimeout = 1000*60*60*24;
long shortDeregisterTimeout = 1000;
- setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+ setAutoDeregisterConfigs(CLUSTER_NAME, longDeregisterTimeout);
// Create and immediately kill participants
List<MockParticipantManager> killedParticipants = new ArrayList<>();
@@ -222,7 +230,7 @@ public class TestParticipantDeregistrationStage extends
ZkTestBase {
Thread.sleep(shortDeregisterTimeout);
// Trigger on shorten deregister timeout
- setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
+ setAutoDeregisterConfigs(CLUSTER_NAME, shortDeregisterTimeout);
// Assert participants have been deregistered
boolean result = TestHelper.verify(() -> {
@@ -236,10 +244,51 @@ public class TestParticipantDeregistrationStage extends
ZkTestBase {
killedParticipants.forEach(participant -> {
dropParticipant(CLUSTER_NAME, participant);
});
+ setAutoDeregisterConfigs(CLUSTER_NAME, DEREGISTER_TIMEOUT);
System.out.println("END " + TestHelper.getTestClassName() + "." +
TestHelper.getTestMethodName() + " at "
+ new Date(System.currentTimeMillis()));
}
+ @Test
+ public void testParticipantDeregisteredAndRejoins() throws Exception {
+ String firstDB = "firstDB";
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, 3, "LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(), null);
+ IdealState idealStateOne =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
firstDB);
+ idealStateOne.setMinActiveReplicas(2);
+ idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
firstDB, idealStateOne);
+ // Set replica count to be # instances in cluster + 1, so that we can
ensure that each participant assigned a replica
+ int replicaCount =
_gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME).size();
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, replicaCount);
+
+ // Wait for cluster to converge
+ Assert.assertTrue(_verifier.verifyByPolling());
+
+ MockParticipantManager participantToDeregister = _participants.get(0);
+ participantToDeregister.syncStop();
+ boolean result = TestHelper.verify(() ->
!_admin.getInstancesInCluster(CLUSTER_NAME)
+ .contains(participantToDeregister.getInstanceName()),
TestHelper.WAIT_DURATION);
+ Assert.assertTrue(result, "Participant should have been deregistered");
+
+ addParticipant(CLUSTER_NAME, participantToDeregister.getInstanceName());
+
+ // Wait for cluster to converge
+ Assert.assertTrue(_verifier.verifyByPolling());
+
+ // loop through partitions and ensure that the participant is assigned to
at least one partition
+ ExternalView ev =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
firstDB);
+ boolean assigned = false;
+ for (String partition : ev.getPartitionSet()) {
+ if
(ev.getStateMap(partition).containsKey(participantToDeregister.getInstanceName()))
{
+ assigned = true;
+ break;
+ }
+ }
+ Assert.assertTrue(assigned, "Participant should have been assigned to at
least one partition");
+ }
+
@Override
public void dropParticipant(String clusterName, MockParticipantManager
participant) {
_participants.remove(participant);