This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ac326a59e40 Improved stability of Ratis addMemberToGroup and
testThresholdSnapshot UT (#11717)
ac326a59e40 is described below
commit ac326a59e40185c867d9af7bfb5f2e4e8d8c372f
Author: Potato <[email protected]>
AuthorDate: Fri Dec 15 10:19:29 2023 +0800
Improved stability of Ratis addMemberToGroup and testThresholdSnapshot UT
(#11717)
---
.../iotdb/consensus/ratis/DiskGuardianTest.java | 3 ++-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 25 ++++++++++++----------
.../iotdb/consensus/ratis/RecoverReadTest.java | 4 ++--
.../apache/iotdb/consensus/ratis/TestUtils.java | 25 +++++++++++++++++-----
4 files changed, 38 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
index e4ca6b503c7..9c35b293975 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
@@ -88,11 +88,12 @@ public class DiskGuardianTest {
s.createLocalPeer(gid, members);
}
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
miniCluster.writeManySerial(0, 10);
Assert.assertFalse(hasSnapshot(gid));
JavaUtils.attemptUntilTrue(
() -> hasSnapshot(gid),
- 3,
+ 12,
TimeDuration.valueOf(5, TimeUnit.SECONDS),
"should take snapshot",
logger);
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index d04196523f5..293ae587e5f 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -37,7 +37,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -49,7 +48,6 @@ public class RatisConsensusTest {
private List<RatisConsensus> servers;
private List<IStateMachine> stateMachines;
private ConsensusGroup group;
- CountDownLatch latch;
private TestUtils.MiniCluster miniCluster;
private final ExecutorService writeExecutor =
Executors.newFixedThreadPool(2);
@@ -104,7 +102,7 @@ public class RatisConsensusTest {
servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
doConsensus(0, 10, 10);
}
@@ -127,10 +125,15 @@ public class RatisConsensusTest {
servers.get(2).createLocalPeer(group.getGroupId(), peers.subList(2, 3));
servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2));
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
+
+ for (int i = 0; i < 3; i++) {
+ if (servers.get(i).isLeaderReady(gid)) {
+ Assert.assertEquals(
+ 3, ((TestUtils.IntegerCounter)
stateMachines.get(i)).getConfiguration().size());
+ }
+ }
- Assert.assertEquals(
- 3, ((TestUtils.IntegerCounter)
stateMachines.get(0)).getConfiguration().size());
doConsensus(0, 10, 20);
}
@@ -144,7 +147,7 @@ public class RatisConsensusTest {
servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
doConsensus(0, 10, 10);
servers.get(0).transferLeader(gid, peers.get(0));
@@ -153,7 +156,7 @@ public class RatisConsensusTest {
servers.get(0).removeRemotePeer(gid, peers.get(2));
servers.get(2).deleteLocalPeer(gid);
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
doConsensus(0, 10, 20);
}
@@ -193,14 +196,14 @@ public class RatisConsensusTest {
servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
// 200 operation will trigger snapshot & purge
doConsensus(0, 200, 200);
miniCluster.stop();
miniCluster.restart();
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
doConsensus(0, 10, 210);
}
@@ -232,7 +235,7 @@ public class RatisConsensusTest {
servers.get(1).createLocalPeer(gid, peers.subList(1, 2));
servers.get(0).addRemotePeer(gid, peers.get(1));
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElectedAndReady();
doConsensus(1, 10, 20);
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 83e44d27850..8dbd880f0dc 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -175,7 +175,7 @@ public class RecoverReadTest {
miniCluster.restart();
// wait an active leader to serve linearizable read requests
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElected();
Assert.assertEquals(10, miniCluster.mustRead(0));
}
@@ -231,7 +231,7 @@ public class RecoverReadTest {
miniCluster.restart();
// wait until active leader to serve read index requests
- miniCluster.waitUntilActiveLeader();
+ miniCluster.waitUntilActiveLeaderElected();
// query during redo: get exception that ratis is under recovery
Assert.assertThrows(RatisReadUnavailableException.class, () ->
miniCluster.readThrough(0));
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 275097796ba..1fd27ccc136 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -65,9 +65,11 @@ import java.util.function.Function;
import java.util.function.Supplier;
public class TestUtils {
+
private static final Logger logger =
LoggerFactory.getLogger(TestUtils.class);
public static class TestDataSet implements DataSet {
+
private int number;
public void setNumber(int number) {
@@ -80,6 +82,7 @@ public class TestUtils {
}
public static class TestRequest implements IConsensusRequest {
+
private final int cmd;
public TestRequest(ByteBuffer buffer) {
@@ -113,6 +116,7 @@ public class TestUtils {
}
public static class IntegerCounter implements IStateMachine,
IStateMachine.EventApi {
+
protected AtomicInteger integer;
private final Logger logger =
LoggerFactory.getLogger(IntegerCounter.class);
private List<Peer> configuration;
@@ -214,6 +218,7 @@ public class TestUtils {
/** A Mini Raft CLuster Wrapper for Test Env. */
static class MiniCluster {
+
private final ConsensusGroupId gid;
private final int replicas;
private final List<Peer> peers;
@@ -341,12 +346,21 @@ public class TestUtils {
return group;
}
- void waitUntilActiveLeader() throws InterruptedException {
+ void waitUntilActiveLeaderElected() throws InterruptedException {
JavaUtils.attemptUntilTrue(
- () -> getServer(0).getLeader(gid) != null,
- 100,
+ () -> servers.stream().anyMatch(server -> server.isLeader(gid)),
+ 600,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
- "wait leader",
+ "wait leader elected",
+ null);
+ }
+
+ void waitUntilActiveLeaderElectedAndReady() throws InterruptedException {
+ JavaUtils.attemptUntilTrue(
+ () -> servers.stream().anyMatch(server -> server.isLeaderReady(gid)),
+ 600,
+ TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
+ "wait leader elected and become ready",
null);
}
@@ -395,7 +409,7 @@ public class TestUtils {
int mustRead(int serverIndex) throws InterruptedException {
final ByteBufferConsensusRequest readRequest =
TestUtils.TestRequest.getRequest();
- waitUntilActiveLeader();
+ waitUntilActiveLeaderElectedAndReady();
final TimeDuration maxTryDuration = TimeDuration.valueOf(3,
TimeUnit.MINUTES);
final TimeDuration waitDuration = TimeDuration.valueOf(1000,
TimeUnit.MILLISECONDS);
@@ -430,6 +444,7 @@ public class TestUtils {
}
static class MiniClusterFactory {
+
private final int replicas = 3;
private ConsensusGroupId gid = new DataRegionId(1);
private final Function<Integer, File> peerStorageProvider =