This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a8eb7faa3 ZOOKEEPER-3624: Fix flaky 
`QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss`
a8eb7faa3 is described below

commit a8eb7faa34e90c748f5f49f211a6dbad78c16f0b
Author: Kezhu Wang <[email protected]>
AuthorDate: Sat Jan 4 06:16:55 2025 +0800

    ZOOKEEPER-3624: Fix flaky 
`QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss`
    
    Reviewers: anmolnar
    Author: kezhuw
    Closes #2204 from 
kezhuw/ZOOKEEPER-3624-fix-flaky-testFailedTxnAsPartOfQuorumLoss
---
 .../apache/zookeeper/server/quorum/QuorumPeer.java |   1 +
 .../server/quorum/QuorumPeerMainTest.java          | 102 +++++++++++++--------
 2 files changed, 65 insertions(+), 38 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 876a297f9..786450d35 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -1578,6 +1578,7 @@ public class QuorumPeer extends ZooKeeperThread 
implements QuorumStats.Provider
                     } else {
                         try {
                             reconfigFlagClear();
+                            checkSuspended();
                             if (shuttingDownLE) {
                                 shuttingDownLE = false;
                                 startLeaderElection();
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 6c0f4f926..eb4966e75 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -706,6 +706,9 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         assertEquals(maxSessionTimeOut, quorumPeer.getMaxSessionTimeout(), 
"maximumSessionTimeOut is wrong");
     }
 
+    /**
+     * Verify that failed txn in isolated leader got truncated after rejoining 
quorum.
+     */
     @Test
     public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
         final int LEADER_TIMEOUT_MS = 10_000;
@@ -729,6 +732,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         // increase the tick time to delay the leader going to looking
         int previousTick = servers.mt[leader].main.quorumPeer.tickTime;
         servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+        // isolate it from other quorum members by prevent it from rejoining
+        servers.mt[leader].getQuorumPeer().setSuspended(true);
         // let the previous tick on the leader exhaust itself so the new tick 
time takes effect
         Thread.sleep(previousTick);
         LOG.warn("LEADER {}", leader);
@@ -739,34 +744,18 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
             }
         }
 
-        // 3. start up the followers to form a new quorum
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i != leader) {
-                servers.mt[i].start();
-            }
-        }
-
-        // 4. wait one of the follower to be the new leader
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            if (i != leader) {
-                // Recreate a client session since the previous session was 
not persisted.
-                servers.restartClient(i, this);
-                waitForOne(servers.zk[i], States.CONNECTED);
-            }
-        }
-
-        // 5. send a create request to old leader and make sure it's synced to 
disk,
+        // 3. send a create request to old leader and make sure it's synced to 
disk,
         //    which means it acked from itself
         try {
             servers.zk[leader].create("/zk" + leader, "zk".getBytes(), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             fail("create /zk" + leader + " should have failed");
-        } catch (KeeperException e) {
+        } catch (KeeperException ignored) {
         }
 
-        // just make sure that we actually did get it in process at the
-        // leader
+        // just make sure that we actually did get it in process at the leader
+        //
         // there can be extra sessionClose proposals
-        assertTrue(outstanding.size() > 0);
+        assertFalse(outstanding.isEmpty());
         Proposal p = findProposalOfType(outstanding, OpCode.create);
         LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
         assertNotNull(p, "Old leader doesn't have 'create' proposal");
@@ -782,36 +771,73 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
             sleepTime += 100;
         }
 
-        // 6. wait for the leader to quit due to not enough followers and come 
back up as a part of the new quorum
-        LOG.info("Waiting for leader {} to timeout followers", leader);
+        // 4. start up the followers to form a new quorum
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                servers.mt[i].start();
+            }
+        }
+
+        // 5. wait one of the follower to be the new leader
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i != leader) {
+                // Recreate a new client session to avoid ConnectionLoss as 
connecting server is restarted.
+                servers.restartClient(i, this);
+                waitForOne(servers.zk[i], States.CONNECTED);
+            }
+        }
+
+        // 6. make sure new quorum does not replicate the failed txn
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (i == leader) {
+                continue;
+            }
+            assertNull(servers.zk[i].exists("/zk" + leader, false),
+                    "server " + i + " should not have /zk" + leader);
+        }
+
+        // resume election to rejoin the cluster
+        servers.mt[leader].getQuorumPeer().setSuspended(false);
+
+        // 7. wait for the leader to quit due to not enough followers and come 
back up as a part of the new quorum
+        LOG.info("Waiting for leader {} to timeout and rejoin as follower", 
leader);
         sleepTime = 0;
-        Follower f = servers.mt[leader].main.quorumPeer.follower;
-        while (f == null || !f.isRunning()) {
-            if (sleepTime > LEADER_TIMEOUT_MS * 2) {
-                fail("Took too long for old leader to time out "
+        while (servers.mt[leader].getQuorumPeer().getPeerState() != 
QuorumPeer.ServerState.FOLLOWING) {
+            if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2) {
+                fail("Took too long for old leader to time out and rejoin "
                                     + 
servers.mt[leader].main.quorumPeer.getPeerState());
             }
             Thread.sleep(100);
             sleepTime += 100;
-            f = servers.mt[leader].main.quorumPeer.follower;
         }
 
         int newLeader = servers.findLeader();
         // make sure a different leader was elected
         assertNotEquals(leader, newLeader);
 
-        // 7. restart the previous leader to force it to replay the edits and 
possibly come up in a bad state
-        servers.mt[leader].shutdown();
-        servers.mt[leader].start();
-        // old client session can expire, restart it
+        // Now, all preconditions meet. Let's verify that the failed txn got 
truncated in whole cluster.
+
+        boolean restarted = false;
         servers.restartClient(leader, this);
-        waitForAll(servers, States.CONNECTED);
+        waitForOne(servers.zk[leader], States.CONNECTED);
+        while (true) {
+            // 7. make sure everything is consistent, that is the failed txn 
got truncated in old leader.
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                assertNull(servers.zk[i].exists("/zk" + leader, false),
+                        "server " + i + " should not have /zk" + leader);
+            }
 
-        // 8. check the node exist in previous leader but not others
-        //    make sure everything is consistent
-        for (int i = 0; i < SERVER_COUNT; i++) {
-            assertNull(servers.zk[i].exists("/zk" + leader, false),
-                    "server " + i + " should not have /zk" + leader);
+            if (restarted) {
+                break;
+            }
+
+            // 8. make sure above holds after restart
+            servers.mt[leader].shutdown();
+            servers.mt[leader].start();
+            // old client session can expire, restart it
+            servers.restartClient(leader, this);
+            waitForAll(servers, States.CONNECTED);
+            restarted = true;
         }
     }
 

Reply via email to