This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 62f0e9c81 Fix a race condition in metaclient leader election (#3058)
62f0e9c81 is described below

commit 62f0e9c814b7543cc26367f1c3841ca0d5db7ae5
Author: xyuanlu <[email protected]>
AuthorDate: Mon Aug 11 13:42:10 2025 -0700

    Fix a race condition in metaclient leader election (#3058)
    
    Fix a race condition in metaclient leader election.
    Re-register listeners when reconnect.
---
 .../leaderelection/LeaderElectionClient.java       | 42 +++++++++-
 .../recipes/leaderelection/LeaderInfo.java         | 14 +++-
 .../recipes/leaderelection/TestLeaderElection.java | 89 ++++++++++++++++++++++
 3 files changed, 138 insertions(+), 7 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index fc25289a8..9f8513e1e 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -27,6 +27,8 @@ import java.util.List;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.helix.metaclient.api.ConnectStateChangeListener;
 import org.apache.helix.metaclient.api.DataChangeListener;
@@ -40,6 +42,7 @@ import 
org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable {
 
   private final static String LEADER_ENTRY_KEY = "/LEADER";
   private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";
+
   private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
   ReElectListener _reElectListener = new ReElectListener();
   ConnectStateListener _connectStateListener = new ConnectStateListener();
+  private final ConcurrentHashMap<String, 
Set<LeaderElectionListenerInterfaceAdapter>> _leaderChangeListeners =
+          new ConcurrentHashMap<>();
 
   /**
    * Construct a LeaderElectionClient using a user passed in 
leaderElectionConfig. It creates a MetaClient
@@ -189,9 +195,11 @@ public class LeaderElectionClient implements AutoCloseable 
{
   }
 
   private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
-    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, 
_reElectListener, false);
+    _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
+    registerAllListeners();
     LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
     leaderInfo.setLeaderName(_participant);
+    leaderInfo.setAcquiredTime();
 
     try {
       createPathIfNotExists(leaderPath);
@@ -208,8 +216,6 @@ public class LeaderElectionClient implements AutoCloseable {
     } catch (MetaClientNodeExistsException ex) {
       LOG.info("Already a leader in leader group {}.", leaderPath);
     }
-
-    _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
   }
 
   /**
@@ -349,6 +355,7 @@ public class LeaderElectionClient implements AutoCloseable {
    */
   public boolean subscribeLeadershipChanges(String leaderPath, 
LeaderElectionListenerInterface listener) {
     LeaderElectionListenerInterfaceAdapter adapter = new 
LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
+    _leaderChangeListeners.computeIfAbsent(leaderPath + LEADER_ENTRY_KEY, k -> 
ConcurrentHashMap.newKeySet()).add(adapter);
     _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
         adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe 
event when path is not there
     _metaClient.subscribeStateChanges(adapter);
@@ -364,6 +371,7 @@ public class LeaderElectionClient implements AutoCloseable {
     _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
         );
     _metaClient.unsubscribeConnectStateChanges(adapter);
+    _leaderChangeListeners.get(leaderPath + LEADER_ENTRY_KEY).remove(adapter);
   }
 
   @Override
@@ -421,6 +429,8 @@ public class LeaderElectionClient implements AutoCloseable {
             LOG.info("Participant {} already in leader group {}.", 
_participant, leaderPath);
           }
         }
+
+        registerAllListeners();
         // touch leader node to renew session ID
         touchLeaderNode();
       }
@@ -436,11 +446,15 @@ public class LeaderElectionClient implements 
AutoCloseable {
     for (String leaderPath : _leaderGroups) {
       String key = leaderPath;
       ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = 
_metaClient.getDataAndStat(key);
+      LOG.info("touch leader node: current leader: {}, current participant: 
{}",
+          tup.left.getLeaderName(), _participant);
       if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
         int expectedVersion = tup.right.getVersion();
+        LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId());
+        newInfo.setAcquiredTime();
         try {
           LOG.info("Try touch leader node for path {}", _leaderGroups);
-          _metaClient.set(key, tup.left, expectedVersion);
+          _metaClient.set(key, newInfo, expectedVersion);
         } catch (MetaClientNoNodeException ex) {
           LOG.info("leaderPath {} gone when retouch leader node.", key);
         } catch (MetaClientBadVersionException e) {
@@ -452,6 +466,26 @@ public class LeaderElectionClient implements AutoCloseable 
{
     }
   }
 
+  private void registerAllListeners(){
+    // resubscribe the  re-elect listener
+    for (String leaderPath : _leaderGroups) {
+      LOG.info("Subscribe re-elect listener for leaderPath {}.", leaderPath);
+      _metaClient.subscribeDataChange(leaderPath, _reElectListener, false);
+    }
+
+    // resubscribe to leader entry change since we are reconnected
+    for (Map.Entry<String, Set<LeaderElectionListenerInterfaceAdapter>> entry: 
_leaderChangeListeners.entrySet()) {
+      String leaderPath = entry.getKey();
+      LOG.info("Subscribe leader change listener for leaderPath 
{}.",leaderPath);
+      Set<LeaderElectionListenerInterfaceAdapter> listeners = entry.getValue();
+      for (LeaderElectionListenerInterfaceAdapter listener : listeners) {
+        _metaClient.subscribeDataChange(leaderPath,
+                listener, false /*skipWatchingNonExistNode*/); // we need to 
subscribe event when path is not there
+        _metaClient.subscribeStateChanges(listener);
+      }
+    }
+  }
+
   public MetaClientInterface getMetaClient() {
     return _metaClient;
   }
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
index ab0562502..2e1a4b2a3 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
@@ -53,15 +53,23 @@ public class LeaderInfo extends DataRecord {
     PARTICIPANTS
   }
 
-@JsonIgnore(true)
-public String getLeaderName() {
+  @JsonIgnore(true)
+  public String getLeaderName() {
     return getSimpleField("LEADER_NAME");
   }
 
   @JsonIgnore(true)
   public void setLeaderName(String id) {
-     setSimpleField("LEADER_NAME", id);
+    setSimpleField("LEADER_NAME", id);
   }
 
+  @JsonIgnore(true)
+  public void setAcquiredTime() {
+    setSimpleField("ACQUIRED_TIME", 
String.valueOf(System.currentTimeMillis()));
+  }
 
+  @JsonIgnore(true)
+  public String getAcquiredTime() {
+    return getSimpleField("ACQUIRED_TIME");
+  }
 }
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index c7aa06eff..8bb4841e7 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -55,6 +55,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
     clt1.close();
   }
 
+
   @Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool")
   public void testAcquireLeadership() throws Exception {
     System.out.println("START TestLeaderElection.testAcquireLeadership");
@@ -382,6 +383,94 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     System.out.println("END 
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
   }
 
+  @Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire")
+  public void testClientLeadershipChangeListenersAfterExpire() throws 
Exception {
+    System.out.println("START 
TestLeaderElection.testClientLeadershipChangeListenersAfterEspire");
+    String leaderPath = LEADER_PATH + 
"/testClientLeadershipChangeListenersAfterEspire";
+    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+    participantInfo.setSimpleField("Key1", "value1");
+    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+    participantInfo2.setSimpleField("Key2", "value2");
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+    final int[] numNewLeaderEvent = {0};
+    final int[] numLeaderGoneEvent = {0};
+    CountDownLatch countDownLatchNewLeader = new CountDownLatch(1);
+
+
+    LeaderElectionListenerInterface listener = new 
LeaderElectionListenerInterface() {
+
+      @Override
+      public void onLeadershipChange(String leaderPath, ChangeType type, 
String curLeader) {
+        if (type == ChangeType.LEADER_LOST) {
+          //countDownLatchLeaderGone.countDown();
+          Assert.assertEquals(curLeader.length(), 0);
+          numLeaderGoneEvent[0]++;
+          System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]);
+        } else if (type == ChangeType.LEADER_ACQUIRED) {
+          countDownLatchNewLeader.countDown();
+          numNewLeaderEvent[0]++;
+            System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader);
+          Assert.assertTrue(curLeader.length() != 0);
+        } else {
+          Assert.fail();
+        }
+      }
+    };
+    clt1.subscribeLeadershipChanges(leaderPath,  listener);
+    // session expire and reconnect
+    expireSession((ZkMetaClient) clt1.getMetaClient());
+
+    // when session recreated, participant info node should maintain
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+    // clt1 closed and reconnected
+    simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());
+
+    // verify listener get called after session expire and reconnect
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+    // now clt1 should be leader
+    // verify we got a new leader event after node 2 left
+    Assert.assertTrue(MetaClientTestUtil.verify(()-> {
+      return (numNewLeaderEvent[0] == 1);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    countDownLatchNewLeader.await();
+
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    // have clt2 join and clt1 leave and join again, and verify listener still 
works
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+    clt1.exitLeaderElectionParticipantPool(leaderPath);
+
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt2.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt2.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    // now clt1 join again, and verify listener still works
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+    // verify clt1 is leader
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+
+    ((ZkMetaClient<?>) clt1.getMetaClient()).close();
+    System.out.println("END 
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+  }
+
+
   private void joinPoolTestHelper(String leaderPath, LeaderElectionClient 
clt1, LeaderElectionClient clt2)
       throws Exception {
     clt1.joinLeaderElectionParticipantPool(leaderPath);

Reply via email to