This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c29dc48 Fix race condition when reconnect in leader election client. 
(#2814)
49c29dc48 is described below

commit 49c29dc4821b7671b752f701addd96ba85c19ab0
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Tue Jun 25 22:36:51 2024 -0700

    Fix race condition when reconnect in leader election client. (#2814)
    
    Fix race condition when reconnect in leader election client.
---
 .../leaderelection/LeaderElectionClient.java       | 27 ++++++++++++------
 .../apache/helix/metaclient/impl/zk/TestUtil.java  | 18 ++++++++++++
 .../recipes/leaderelection/TestLeaderElection.java | 32 ++++++++++++++++++++--
 3 files changed, 66 insertions(+), 11 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index ae7d9c9fa..fc25289a8 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -38,7 +38,6 @@ import 
org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
 import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
 import org.slf4j.Logger;
@@ -167,6 +166,7 @@ public class LeaderElectionClient implements AutoCloseable {
       // try to create participant info entry, assuming leader election group 
node is already there
       _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, participantInfo,
           MetaClientInterface.EntryMode.EPHEMERAL);
+      LOG.info("Participant {} joined leader group {}.", _participant, 
leaderPath);
     } catch (MetaClientNodeExistsException ex) {
       throw new ConcurrentModificationException("Already joined leader 
election group. ", ex);
     } catch (MetaClientNoNodeException ex) {
@@ -261,6 +261,7 @@ public class LeaderElectionClient implements AutoCloseable {
     // deleting ZNode. So that handler in ReElectListener won't recreate the 
leader node.
     if (exitLeaderElectionParticipantPool) {
       _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+      LOG.info("Leaving leader election pool {}.", leaderPath);
       _metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant);
     }
     // check if current participant is the leader
@@ -272,12 +273,13 @@ public class LeaderElectionClient implements 
AutoCloseable {
         List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), 
Op.delete(key, expectedVersion));
         //Execute transactional support on operations
         List<OpResult> opResults = _metaClient.transactionOP(ops);
+        LOG.info("Try relinquish leader {}.", leaderPath);
         if (opResults.get(0).getType() == ERRORRESULT) {
           if (isLeader(leaderPath)) {
             // Participant re-elected as leader.
             throw new ConcurrentModificationException("Concurrent operation, 
please retry");
           } else {
-            LOG.info("Someone else is already leader");
+            LOG.info("Someone else is already leader when relinquishing 
leadership for path {}.", leaderPath);
           }
         }
       }
@@ -366,7 +368,7 @@ public class LeaderElectionClient implements AutoCloseable {
 
   @Override
   public void close() throws Exception {
-
+    LOG.info("Closing leader election client.");
     _metaClient.unsubscribeConnectStateChanges(_connectStateListener);
 
     // exit all previous joined leader election groups
@@ -406,14 +408,20 @@ public class LeaderElectionClient implements 
AutoCloseable {
     @Override
     public void handleConnectStateChanged(MetaClientInterface.ConnectState 
prevState,
         MetaClientInterface.ConnectState currentState) throws Exception {
-      if (prevState == MetaClientInterface.ConnectState.EXPIRED
-          && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+      LOG.info("Connect state changed from {} to {}", prevState, currentState);
+      if (currentState == MetaClientInterface.ConnectState.CONNECTED) {
+        // when reconnected, we try to recreate the ephemeral node for 
participant
         for (String leaderPath : _participantInfos.keySet()) {
-          _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, _participantInfos.get(leaderPath),
-              MetaClientInterface.EntryMode.EPHEMERAL);
+          try {
+            LOG.info("Recreate participant node for leaderPath {}.", 
leaderPath);
+            _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, _participantInfos.get(leaderPath),
+                MetaClientInterface.EntryMode.EPHEMERAL);
+          } catch (MetaClientNodeExistsException ex) {
+            // If reconnected before expire, the ephemeral node is still there.
+            LOG.info("Participant {} already in leader group {}.", 
_participant, leaderPath);
+          }
         }
-      } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
-          && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+        // touch leader node to renew session ID
         touchLeaderNode();
       }
     }
@@ -431,6 +439,7 @@ public class LeaderElectionClient implements AutoCloseable {
       if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
         int expectedVersion = tup.right.getVersion();
         try {
+          LOG.info("Try touch leader node for path {}", _leaderGroups);
           _metaClient.set(key, tup.left, expectedVersion);
         } catch (MetaClientNoNodeException ex) {
           LOG.info("leaderPath {} gone when retouch leader node.", key);
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index 067fe3eb5..92c0182b3 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -185,4 +185,22 @@ public class TestUtil {
     zkClient.process(event);
   }
 
+  /**
+   * Simulate a zk state change by calling {@link 
ZkClient#process(WatchedEvent)} directly
+   * This need to be done in a separate thread to simulate ZkClient 
eventThread.
+   */
+  public static void simulateZkStateClosedAndReconnect(ZkMetaClient client) 
throws InterruptedException {
+    final ZkClient zkClient = client.getZkClient();
+    WatchedEvent event =
+        new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Closed,
+            null);
+    zkClient.process(event);
+
+    Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+    event = new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.SyncConnected,
+        null);
+    zkClient.process(event);
+  }
+
 }
\ No newline at end of file
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 248643652..c7aa06eff 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -5,7 +5,6 @@ import org.apache.helix.metaclient.MetaClientTestUtil;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.TestUtil;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
@@ -41,6 +40,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
     }
   }
 
+
   // Test that calling isLeader before client joins 
LeaderElectionParticipantPool returns false and does not throw NPE
   @Test
   public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
@@ -299,7 +299,7 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     System.out.println("END TestLeaderElection.testSessionExpire");
   }
 
-  @Test(dependsOnMethods = "testSessionExpire")
+  @Test (dependsOnMethods = "testSessionExpire")
   public void testClientDisconnectAndReconnectBeforeExpire() throws Exception {
     System.out.println("START 
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
     String leaderPath = LEADER_PATH + 
"/testClientDisconnectAndReconnectBeforeExpire";
@@ -354,6 +354,34 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     System.out.println("END 
TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
   }
 
+  @Test(dependsOnMethods = "testClientDisconnectAndReconnectBeforeExpire")
+  public void testClientClosedAndReconnectAfterExpire() throws Exception {
+    System.out.println("START 
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+    String leaderPath = LEADER_PATH + 
"/testClientClosedAndReconnectAfterExpire";
+    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+    participantInfo.setSimpleField("Key1", "value1");
+    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+    participantInfo2.setSimpleField("Key2", "value2");
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+    // session expire and reconnect
+    expireSession((ZkMetaClient) clt1.getMetaClient());
+    // clt1 closed and reconnected
+    simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());
+
+    // when session recreated, participant info node should maintain
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+    ((ZkMetaClient<?>) clt1.getMetaClient()).close();
+    clt2.close();
+    System.out.println("END 
TestLeaderElection.testClientClosedAndReconnectAfterExpire");
+  }
+
   private void joinPoolTestHelper(String leaderPath, LeaderElectionClient 
clt1, LeaderElectionClient clt2)
       throws Exception {
     clt1.joinLeaderElectionParticipantPool(leaderPath);

Reply via email to