This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 62f0e9c81 Fix a race condition in metaclient leader election (#3058)
62f0e9c81 is described below
commit 62f0e9c814b7543cc26367f1c3841ca0d5db7ae5
Author: xyuanlu <[email protected]>
AuthorDate: Mon Aug 11 13:42:10 2025 -0700
Fix a race condition in metaclient leader election (#3058)
Fix a race condition in metaclient leader election.
Re-register listeners when reconnect.
---
.../leaderelection/LeaderElectionClient.java | 42 +++++++++-
.../recipes/leaderelection/LeaderInfo.java | 14 +++-
.../recipes/leaderelection/TestLeaderElection.java | 89 ++++++++++++++++++++++
3 files changed, 138 insertions(+), 7 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index fc25289a8..9f8513e1e 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
@@ -40,6 +42,7 @@ import
org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable {
private final static String LEADER_ENTRY_KEY = "/LEADER";
private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";
+
private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
ReElectListener _reElectListener = new ReElectListener();
ConnectStateListener _connectStateListener = new ConnectStateListener();
+ private final ConcurrentHashMap<String,
Set<LeaderElectionListenerInterfaceAdapter>> _leaderChangeListeners =
+ new ConcurrentHashMap<>();
/**
* Construct a LeaderElectionClient using a user passed in
leaderElectionConfig. It creates a MetaClient
@@ -189,9 +195,11 @@ public class LeaderElectionClient implements AutoCloseable
{
}
private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
- _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
_reElectListener, false);
+ _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
+ registerAllListeners();
LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
leaderInfo.setLeaderName(_participant);
+ leaderInfo.setAcquiredTime();
try {
createPathIfNotExists(leaderPath);
@@ -208,8 +216,6 @@ public class LeaderElectionClient implements AutoCloseable {
} catch (MetaClientNodeExistsException ex) {
LOG.info("Already a leader in leader group {}.", leaderPath);
}
-
- _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
}
/**
@@ -349,6 +355,7 @@ public class LeaderElectionClient implements AutoCloseable {
*/
public boolean subscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
LeaderElectionListenerInterfaceAdapter adapter = new
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+ _leaderChangeListeners.computeIfAbsent(leaderPath + LEADER_ENTRY_KEY, k ->
ConcurrentHashMap.newKeySet()).add(adapter);
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe
event when path is not there
_metaClient.subscribeStateChanges(adapter);
@@ -364,6 +371,7 @@ public class LeaderElectionClient implements AutoCloseable {
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
);
_metaClient.unsubscribeConnectStateChanges(adapter);
+ _leaderChangeListeners.get(leaderPath + LEADER_ENTRY_KEY).remove(adapter);
}
@Override
@@ -421,6 +429,8 @@ public class LeaderElectionClient implements AutoCloseable {
LOG.info("Participant {} already in leader group {}.",
_participant, leaderPath);
}
}
+
+ registerAllListeners();
// touch leader node to renew session ID
touchLeaderNode();
}
@@ -436,11 +446,15 @@ public class LeaderElectionClient implements
AutoCloseable {
for (String leaderPath : _leaderGroups) {
String key = leaderPath;
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup =
_metaClient.getDataAndStat(key);
+ LOG.info("touch leader node: current leader: {}, current participant:
{}",
+ tup.left.getLeaderName(), _participant);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
+ LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId());
+ newInfo.setAcquiredTime();
try {
LOG.info("Try touch leader node for path {}", _leaderGroups);
- _metaClient.set(key, tup.left, expectedVersion);
+ _metaClient.set(key, newInfo, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
} catch (MetaClientBadVersionException e) {
@@ -452,6 +466,26 @@ public class LeaderElectionClient implements AutoCloseable
{
}
}
+ private void registerAllListeners(){
+ // resubscribe the re-elect listener
+ for (String leaderPath : _leaderGroups) {
+ LOG.info("Subscribe re-elect listener for leaderPath {}.", leaderPath);
+ _metaClient.subscribeDataChange(leaderPath, _reElectListener, false);
+ }
+
+ // resubscribe to leader entry change since we are reconnected
+ for (Map.Entry<String, Set<LeaderElectionListenerInterfaceAdapter>> entry:
_leaderChangeListeners.entrySet()) {
+ String leaderPath = entry.getKey();
+ LOG.info("Subscribe leader change listener for leaderPath
{}.",leaderPath);
+ Set<LeaderElectionListenerInterfaceAdapter> listeners = entry.getValue();
+ for (LeaderElectionListenerInterfaceAdapter listener : listeners) {
+ _metaClient.subscribeDataChange(leaderPath,
+ listener, false /*skipWatchingNonExistNode*/); // we need to
subscribe event when path is not there
+ _metaClient.subscribeStateChanges(listener);
+ }
+ }
+ }
+
public MetaClientInterface getMetaClient() {
return _metaClient;
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
index ab0562502..2e1a4b2a3 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
@@ -53,15 +53,23 @@ public class LeaderInfo extends DataRecord {
PARTICIPANTS
}
-@JsonIgnore(true)
-public String getLeaderName() {
+ @JsonIgnore(true)
+ public String getLeaderName() {
return getSimpleField("LEADER_NAME");
}
@JsonIgnore(true)
public void setLeaderName(String id) {
- setSimpleField("LEADER_NAME", id);
+ setSimpleField("LEADER_NAME", id);
}
+ @JsonIgnore(true)
+ public void setAcquiredTime() {
+ setSimpleField("ACQUIRED_TIME",
String.valueOf(System.currentTimeMillis()));
+ }
+ @JsonIgnore(true)
+ public String getAcquiredTime() {
+ return getSimpleField("ACQUIRED_TIME");
+ }
}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index c7aa06eff..8bb4841e7 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -55,6 +55,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
clt1.close();
}
+
@Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool")
public void testAcquireLeadership() throws Exception {
System.out.println("START TestLeaderElection.testAcquireLeadership");
@@ -382,6 +383,94 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
System.out.println("END
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}
+ @Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire")
+ public void testClientLeadershipChangeListenersAfterExpire() throws
Exception {
+ System.out.println("START
TestLeaderElection.testClientLeadershipChangeListenersAfterEspire");
+ String leaderPath = LEADER_PATH +
"/testClientLeadershipChangeListenersAfterEspire";
+ LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+ participantInfo.setSimpleField("Key1", "value1");
+ LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+ participantInfo2.setSimpleField("Key2", "value2");
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+ final int[] numNewLeaderEvent = {0};
+ final int[] numLeaderGoneEvent = {0};
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(1);
+
+
+ LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
+
+ @Override
+ public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
+ if (type == ChangeType.LEADER_LOST) {
+ //countDownLatchLeaderGone.countDown();
+ Assert.assertEquals(curLeader.length(), 0);
+ numLeaderGoneEvent[0]++;
+ System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]);
+ } else if (type == ChangeType.LEADER_ACQUIRED) {
+ countDownLatchNewLeader.countDown();
+ numNewLeaderEvent[0]++;
+ System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader);
+ Assert.assertTrue(curLeader.length() != 0);
+ } else {
+ Assert.fail();
+ }
+ }
+ };
+ clt1.subscribeLeadershipChanges(leaderPath, listener);
+ // session expire and reconnect
+ expireSession((ZkMetaClient) clt1.getMetaClient());
+
+ // when session recreated, participant info node should maintain
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+ // clt1 closed and reconnected
+ simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());
+
+ // verify listener get called after session expire and reconnect
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+ // now clt1 should be leader
+ // verify we got a new leader event after node 2 left
+ Assert.assertTrue(MetaClientTestUtil.verify(()-> {
+ return (numNewLeaderEvent[0] == 1);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ countDownLatchNewLeader.await();
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ // have clt2 join and clt1 leave and join again, and verify listener still
works
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+ clt1.exitLeaderElectionParticipantPool(leaderPath);
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt2.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt2.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ // now clt1 join again, and verify listener still works
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+ // verify clt1 is leader
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+
+ ((ZkMetaClient<?>) clt1.getMetaClient()).close();
+ System.out.println("END
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+ }
+
+
private void joinPoolTestHelper(String leaderPath, LeaderElectionClient
clt1, LeaderElectionClient clt2)
throws Exception {
clt1.joinLeaderElectionParticipantPool(leaderPath);