This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a8eb7faa3 ZOOKEEPER-3624: Fix flaky
`QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss`
a8eb7faa3 is described below
commit a8eb7faa34e90c748f5f49f211a6dbad78c16f0b
Author: Kezhu Wang <[email protected]>
AuthorDate: Sat Jan 4 06:16:55 2025 +0800
ZOOKEEPER-3624: Fix flaky
`QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss`
Reviewers: anmolnar
Author: kezhuw
Closes #2204 from
kezhuw/ZOOKEEPER-3624-fix-flaky-testFailedTxnAsPartOfQuorumLoss
---
.../apache/zookeeper/server/quorum/QuorumPeer.java | 1 +
.../server/quorum/QuorumPeerMainTest.java | 102 +++++++++++++--------
2 files changed, 65 insertions(+), 38 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 876a297f9..786450d35 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -1578,6 +1578,7 @@ public class QuorumPeer extends ZooKeeperThread
implements QuorumStats.Provider
} else {
try {
reconfigFlagClear();
+ checkSuspended();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index 6c0f4f926..eb4966e75 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -706,6 +706,9 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
assertEquals(maxSessionTimeOut, quorumPeer.getMaxSessionTimeout(),
"maximumSessionTimeOut is wrong");
}
+ /**
+ * Verify that failed txn in isolated leader got truncated after rejoining
quorum.
+ */
@Test
public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
@@ -729,6 +732,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
// increase the tick time to delay the leader going to looking
int previousTick = servers.mt[leader].main.quorumPeer.tickTime;
servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
+ // isolate it from other quorum members by prevent it from rejoining
+ servers.mt[leader].getQuorumPeer().setSuspended(true);
// let the previous tick on the leader exhaust itself so the new tick
time takes effect
Thread.sleep(previousTick);
LOG.warn("LEADER {}", leader);
@@ -739,34 +744,18 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
}
}
- // 3. start up the followers to form a new quorum
- for (int i = 0; i < SERVER_COUNT; i++) {
- if (i != leader) {
- servers.mt[i].start();
- }
- }
-
- // 4. wait one of the follower to be the new leader
- for (int i = 0; i < SERVER_COUNT; i++) {
- if (i != leader) {
- // Recreate a client session since the previous session was
not persisted.
- servers.restartClient(i, this);
- waitForOne(servers.zk[i], States.CONNECTED);
- }
- }
-
- // 5. send a create request to old leader and make sure it's synced to
disk,
+ // 3. send a create request to old leader and make sure it's synced to
disk,
// which means it acked from itself
try {
servers.zk[leader].create("/zk" + leader, "zk".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
- } catch (KeeperException e) {
+ } catch (KeeperException ignored) {
}
- // just make sure that we actually did get it in process at the
- // leader
+ // just make sure that we actually did get it in process at the leader
+ //
// there can be extra sessionClose proposals
- assertTrue(outstanding.size() > 0);
+ assertFalse(outstanding.isEmpty());
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");
@@ -782,36 +771,73 @@ public class QuorumPeerMainTest extends
QuorumPeerTestBase {
sleepTime += 100;
}
- // 6. wait for the leader to quit due to not enough followers and come
back up as a part of the new quorum
- LOG.info("Waiting for leader {} to timeout followers", leader);
+ // 4. start up the followers to form a new quorum
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != leader) {
+ servers.mt[i].start();
+ }
+ }
+
+ // 5. wait one of the follower to be the new leader
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i != leader) {
+ // Recreate a new client session to avoid ConnectionLoss as
connecting server is restarted.
+ servers.restartClient(i, this);
+ waitForOne(servers.zk[i], States.CONNECTED);
+ }
+ }
+
+ // 6. make sure new quorum does not replicate the failed txn
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ if (i == leader) {
+ continue;
+ }
+ assertNull(servers.zk[i].exists("/zk" + leader, false),
+ "server " + i + " should not have /zk" + leader);
+ }
+
+ // resume election to rejoin the cluster
+ servers.mt[leader].getQuorumPeer().setSuspended(false);
+
+ // 7. wait for the leader to quit due to not enough followers and come
back up as a part of the new quorum
+ LOG.info("Waiting for leader {} to timeout and rejoin as follower",
leader);
sleepTime = 0;
- Follower f = servers.mt[leader].main.quorumPeer.follower;
- while (f == null || !f.isRunning()) {
- if (sleepTime > LEADER_TIMEOUT_MS * 2) {
- fail("Took too long for old leader to time out "
+ while (servers.mt[leader].getQuorumPeer().getPeerState() !=
QuorumPeer.ServerState.FOLLOWING) {
+ if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2) {
+ fail("Took too long for old leader to time out and rejoin "
+
servers.mt[leader].main.quorumPeer.getPeerState());
}
Thread.sleep(100);
sleepTime += 100;
- f = servers.mt[leader].main.quorumPeer.follower;
}
int newLeader = servers.findLeader();
// make sure a different leader was elected
assertNotEquals(leader, newLeader);
- // 7. restart the previous leader to force it to replay the edits and
possibly come up in a bad state
- servers.mt[leader].shutdown();
- servers.mt[leader].start();
- // old client session can expire, restart it
+ // Now, all preconditions meet. Let's verify that the failed txn got
truncated in whole cluster.
+
+ boolean restarted = false;
servers.restartClient(leader, this);
- waitForAll(servers, States.CONNECTED);
+ waitForOne(servers.zk[leader], States.CONNECTED);
+ while (true) {
+ // 7. make sure everything is consistent, that is the failed txn
got truncated in old leader.
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ assertNull(servers.zk[i].exists("/zk" + leader, false),
+ "server " + i + " should not have /zk" + leader);
+ }
- // 8. check the node exist in previous leader but not others
- // make sure everything is consistent
- for (int i = 0; i < SERVER_COUNT; i++) {
- assertNull(servers.zk[i].exists("/zk" + leader, false),
- "server " + i + " should not have /zk" + leader);
+ if (restarted) {
+ break;
+ }
+
+ // 8. make sure above holds after restart
+ servers.mt[leader].shutdown();
+ servers.mt[leader].start();
+ // old client session can expire, restart it
+ servers.restartClient(leader, this);
+ waitForAll(servers, States.CONNECTED);
+ restarted = true;
}
}