This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new 49c29dc48 Fix race condition when reconnect in leader election client. (#2814) 49c29dc48 is described below commit 49c29dc4821b7671b752f701addd96ba85c19ab0 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Tue Jun 25 22:36:51 2024 -0700 Fix race condition when reconnect in leader election client. (#2814) Fix race condition when reconnect in leader election client. --- .../leaderelection/LeaderElectionClient.java | 27 ++++++++++++------ .../apache/helix/metaclient/impl/zk/TestUtil.java | 18 ++++++++++++ .../recipes/leaderelection/TestLeaderElection.java | 32 ++++++++++++++++++++-- 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index ae7d9c9fa..fc25289a8 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -38,7 +38,6 @@ import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.exception.MetaClientNoNodeException; import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; import org.apache.helix.metaclient.factories.MetaClientConfig; -import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; import org.slf4j.Logger; @@ -167,6 +166,7 @@ public class LeaderElectionClient implements AutoCloseable { // try to create participant info entry, assuming leader election group node is already there _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, participantInfo, MetaClientInterface.EntryMode.EPHEMERAL); + LOG.info("Participant {} joined leader group {}.", _participant, leaderPath); } catch (MetaClientNodeExistsException ex) { throw new ConcurrentModificationException("Already joined leader election group. ", ex); } catch (MetaClientNoNodeException ex) { @@ -261,6 +261,7 @@ public class LeaderElectionClient implements AutoCloseable { // deleting ZNode. So that handler in ReElectListener won't recreate the leader node. if (exitLeaderElectionParticipantPool) { _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY); + LOG.info("Leaving leader election pool {}.", leaderPath); _metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant); } // check if current participant is the leader @@ -272,12 +273,13 @@ public class LeaderElectionClient implements AutoCloseable { List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion)); //Execute transactional support on operations List<OpResult> opResults = _metaClient.transactionOP(ops); + LOG.info("Try relinquish leader {}.", leaderPath); if (opResults.get(0).getType() == ERRORRESULT) { if (isLeader(leaderPath)) { // Participant re-elected as leader. throw new ConcurrentModificationException("Concurrent operation, please retry"); } else { - LOG.info("Someone else is already leader"); + LOG.info("Someone else is already leader when relinquishing leadership for path {}.", leaderPath); } } } @@ -366,7 +368,7 @@ public class LeaderElectionClient implements AutoCloseable { @Override public void close() throws Exception { - + LOG.info("Closing leader election client."); _metaClient.unsubscribeConnectStateChanges(_connectStateListener); // exit all previous joined leader election groups @@ -406,14 +408,20 @@ public class LeaderElectionClient implements AutoCloseable { @Override public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState, MetaClientInterface.ConnectState currentState) throws Exception { - if (prevState == MetaClientInterface.ConnectState.EXPIRED - && currentState == MetaClientInterface.ConnectState.CONNECTED) { + LOG.info("Connect state changed from {} to {}", prevState, currentState); + if (currentState == MetaClientInterface.ConnectState.CONNECTED) { + // when reconnected, we try to recreate the ephemeral node for participant for (String leaderPath : _participantInfos.keySet()) { - _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath), - MetaClientInterface.EntryMode.EPHEMERAL); + try { + LOG.info("Recreate participant node for leaderPath {}.", leaderPath); + _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath), + MetaClientInterface.EntryMode.EPHEMERAL); + } catch (MetaClientNodeExistsException ex) { + // If reconnected before expire, the ephemeral node is still there. + LOG.info("Participant {} already in leader group {}.", _participant, leaderPath); + } } - } else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED - && currentState == MetaClientInterface.ConnectState.CONNECTED) { + // touch leader node to renew session ID touchLeaderNode(); } } @@ -431,6 +439,7 @@ public class LeaderElectionClient implements AutoCloseable { if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { int expectedVersion = tup.right.getVersion(); try { + LOG.info("Try touch leader node for path {}", _leaderGroups); _metaClient.set(key, tup.left, expectedVersion); } catch (MetaClientNoNodeException ex) { LOG.info("leaderPath {} gone when retouch leader node.", key); diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java index 067fe3eb5..92c0182b3 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java @@ -185,4 +185,22 @@ public class TestUtil { zkClient.process(event); } + /** + * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly + * This need to be done in a separate thread to simulate ZkClient eventThread. + */ + public static void simulateZkStateClosedAndReconnect(ZkMetaClient client) throws InterruptedException { + final ZkClient zkClient = client.getZkClient(); + WatchedEvent event = + new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Closed, + null); + zkClient.process(event); + + Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN); + + event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, + null); + zkClient.process(event); + } + } \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index 248643652..c7aa06eff 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -5,7 +5,6 @@ import org.apache.helix.metaclient.MetaClientTestUtil; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.helix.metaclient.factories.MetaClientConfig; -import org.apache.helix.metaclient.impl.zk.TestUtil; import org.apache.helix.metaclient.impl.zk.ZkMetaClient; import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; @@ -41,6 +40,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase { } } + // Test that calling isLeader before client joins LeaderElectionParticipantPool returns false and does not throw NPE @Test public void testIsLeaderBeforeJoiningParticipantPool() throws Exception { @@ -299,7 +299,7 @@ public class TestLeaderElection extends ZkMetaClientTestBase { System.out.println("END TestLeaderElection.testSessionExpire"); } - @Test(dependsOnMethods = "testSessionExpire") + @Test (dependsOnMethods = "testSessionExpire") public void testClientDisconnectAndReconnectBeforeExpire() throws Exception { System.out.println("START TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire"); String leaderPath = LEADER_PATH + "/testClientDisconnectAndReconnectBeforeExpire"; @@ -354,6 +354,34 @@ public class TestLeaderElection extends ZkMetaClientTestBase { System.out.println("END TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire"); } + @Test(dependsOnMethods = "testClientDisconnectAndReconnectBeforeExpire") + public void testClientClosedAndReconnectAfterExpire() throws Exception { + System.out.println("START TestLeaderElection.testClientClosedAndReconnectAfterExpire"); + String leaderPath = LEADER_PATH + "/testClientClosedAndReconnectAfterExpire"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + + // session expire and reconnect + expireSession((ZkMetaClient) clt1.getMetaClient()); + // clt1 closed and reconnected + simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient()); + + // when session recreated, participant info node should maintain + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + + ((ZkMetaClient<?>) clt1.getMetaClient()).close(); + clt2.close(); + System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); + } + private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) throws Exception { clt1.joinLeaderElectionParticipantPool(leaderPath);