Repository: incubator-ratis Updated Branches: refs/heads/master 0b7337083 -> cce03d04b
RATIS-371. Change tests so that they do not depend on consecutive log indices. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cce03d04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cce03d04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cce03d04 Branch: refs/heads/master Commit: cce03d04bb7ff7670345bbd1e1574c0fdd0d6751 Parents: 0b73370 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Oct 25 06:58:18 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Oct 25 06:58:18 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/TestRestartRaftPeer.java | 106 ----------- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 9 +- .../ratis/grpc/TestServerRestartWithGrpc.java | 25 +++ .../ratis/netty/TestServerRestartWithNetty.java | 25 +++ .../java/org/apache/ratis/RaftAsyncTests.java | 6 +- .../java/org/apache/ratis/RaftTestUtil.java | 18 +- .../java/org/apache/ratis/RetryCacheTests.java | 19 +- .../org/apache/ratis/WatchRequestTests.java | 188 ++++++++++--------- .../apache/ratis/server/ServerRestartTests.java | 110 +++++++++++ .../apache/ratis/server/TestRaftLogMetrics.java | 69 +++---- .../impl/RaftReconfigurationBaseTest.java | 13 +- .../ratis/server/impl/RaftServerTestUtil.java | 11 +- .../impl/RaftStateMachineExceptionTests.java | 9 +- .../server/impl/StateMachineShutdownTests.java | 2 +- .../TestServerRestartWithSimulatedRpc.java | 25 +++ .../server/storage/RaftStorageTestUtils.java | 9 +- .../statemachine/RaftSnapshotBaseTest.java | 75 +++++--- .../SimpleStateMachine4Testing.java | 78 +++++--- 18 files changed, 472 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java deleted file mode 100644 index ccbbda0..0000000 --- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis; - -import org.apache.log4j.Level; -import org.apache.ratis.RaftTestUtil.SimpleMessage; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.examples.ParameterizedBaseTest; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.storage.RaftLog; -import org.apache.ratis.statemachine.SimpleStateMachine4Testing; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; - -/** - * Test restarting raft peers. - */ -@RunWith(Parameterized.class) -public class TestRestartRaftPeer extends BaseTest { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - @Parameterized.Parameters - public static Collection<Object[]> data() throws IOException { - RaftProperties prop = new RaftProperties(); - prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); - return ParameterizedBaseTest.getMiniRaftClusters(prop, 3); - } - - @Parameterized.Parameter - public MiniRaftCluster cluster; - - @Test - public void restartFollower() throws Exception { - cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); - - // write some messages - final byte[] content = new byte[1024]; - Arrays.fill(content, (byte) 1); - final SimpleMessage message = new SimpleMessage(new String(content)); - for (int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - - // restart a follower - RaftPeerId followerId = cluster.getFollowers().get(0).getId(); - LOG.info("Restart follower {}", followerId); - cluster.restartServer(followerId, false); - - // write some more messages - for (int i = 0; i < 10; i++) { - Assert.assertTrue(client.send(message).isSuccess()); - } - client.close(); - - // make sure the restarted follower can catchup - boolean catchup = false; - long lastAppliedIndex = 0; - for (int i = 0; i < 10 && !catchup; i++) { - Thread.sleep(500); - lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex(); - catchup = lastAppliedIndex >= 20; - } - Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); - - // make sure the restarted peer's log segments is correct - cluster.restartServer(followerId, false); - Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() - .getLastEntryTermIndex().getIndex() >= 20); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 7ae385d..d98be53 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -92,16 +92,17 @@ public class TestRaftWithGrpc Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), index); if (!raftServer.isLeader()) { TermIndex[] serverEntries = raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE); - Arrays.equals(serverEntries, leaderEntries); + Assert.assertArrayEquals(serverEntries, leaderEntries); } }); // Wait for heartbeats from leader to be received by followers - Thread.sleep(1000); + Thread.sleep(500); RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> { // FollowerInfo in the leader state should have updated next and match index. - Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 1); - Assert.assertEquals(logAppender.getFollower().getNextIndex(), index); + final long followerMatchIndex = logAppender.getFollower().getMatchIndex(); + Assert.assertTrue(followerMatchIndex >= index - 1); + Assert.assertEquals(followerMatchIndex + 1, logAppender.getFollower().getNextIndex()); }); } cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java new file mode 100644 index 0000000..682b3ba --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.ServerRestartTests; + +public class TestServerRestartWithGrpc + extends ServerRestartTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java new file mode 100644 index 0000000..15dc688 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.server.ServerRestartTests; + +public class TestServerRestartWithNetty + extends ServerRestartTests<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index f79eb6b..c14515c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -192,7 +192,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba // submit some messages final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(); for (int i = 0; i < numMesssages; i++) { - final String s = "m" + i; + final String s = "" + i; LOG.info("sendAsync " + s); futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s))); } @@ -218,12 +218,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba // test a failure case testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index", () -> client.sendStaleReadAsync( - new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)), + new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE), followerCommitInfo.getCommitIndex(), follower), StateMachineException.class, IndexOutOfBoundsException.class); // test sendStaleReadAsync - for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) { + for (int i = 0; i < numMesssages; i++) { final int query = i; LOG.info("sendStaleReadAsync, query=" + query); final Message message = new RaftTestUtil.SimpleMessage("" + query); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 60629f9..5946a47 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -113,11 +113,14 @@ public interface RaftTestUtil { return leader != null ? leader.getId().toString() : null; } - static boolean logEntriesContains(RaftLog log, - SimpleMessage... expectedMessages) { + static boolean logEntriesContains(RaftLog log, SimpleMessage... expectedMessages) { + return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages); + } + + static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) { int idxEntries = 0; int idxExpected = 0; - TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE); + TermIndex[] termIndices = log.getEntries(startIndex, endIndex); while (idxEntries < termIndices.length && idxExpected < expectedMessages.length) { try { @@ -376,4 +379,13 @@ public interface RaftTestUtil { } }).start(); } + + static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception { + Assert.assertEquals(expected.getLastEntryTermIndex(), computed.getLastEntryTermIndex()); + final long lastIndex = expected.getNextIndex() - 1; + Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), lastIndex); + for(long i = 0; i < lastIndex; i++) { + Assert.assertEquals(expected.get(i), computed.get(i)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index 9fdb4f7..c962481 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -29,6 +29,8 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.server.storage.RaftLogIOException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -36,6 +38,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Arrays; +import java.util.stream.LongStream; import static java.util.Arrays.asList; @@ -110,10 +113,21 @@ public abstract class RetryCacheTests extends BaseTest { Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server)); Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId)); // make sure there is only one log entry committed - Assert.assertEquals(oldLastApplied + 1, server.getState().getLastAppliedIndex()); + Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied + 1)); } } + static int count(RaftLog log, long startIndex) throws RaftLogIOException { + final long nextIndex = log.getNextIndex(); + int count = 0; + for(long i = startIndex; i < nextIndex; i++) { + if (log.get(i).hasStateMachineLogEntry()) { + count++; + } + } + return count; + } + /** * Test retry while the leader changes to another peer */ @@ -158,8 +172,7 @@ public abstract class RetryCacheTests extends BaseTest { } // check the new leader and make sure the retry did not get committed - Assert.assertEquals(oldLastApplied + 3, - cluster.getLeader().getState().getLastAppliedIndex()); + Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1)); client.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index 9ff27ad..d1cb7e0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -71,7 +73,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } static class TestParameters { - final long startLogIndex; final int numMessages; final RaftClient writeClient; final RaftClient watchMajorityClient; @@ -81,12 +82,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final MiniRaftCluster cluster; final Logger log; - TestParameters( - long startLogIndex, int numMessages, RaftClient writeClient, + TestParameters(int numMessages, RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger log) { - this.startLogIndex = startLogIndex; this.numMessages = numMessages; this.writeClient = writeClient; this.watchMajorityClient = watchMajorityClient; @@ -97,9 +96,31 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> this.log = log; } + void sendRequests(List<CompletableFuture<RaftClientReply>> replies, + List<CompletableFuture<WatchReplies>> watches) { + for(int i = 0; i < numMessages; i++) { + final String message = "m" + i; + log.info("SEND_REQUEST {}: message={}", i, message); + final CompletableFuture<RaftClientReply> replyFuture = writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message)); + replies.add(replyFuture); + final CompletableFuture<WatchReplies> watchFuture = new CompletableFuture<>(); + watches.add(watchFuture); + replyFuture.thenAccept(reply -> { + final long logIndex = reply.getLogIndex(); + log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex); + watchFuture.complete(new WatchReplies(logIndex, + watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY), + watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL), + watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED), + watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED) + )); + }); + } + } + @Override public String toString() { - return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages; + return "numMessages=" + numMessages; } } @@ -119,10 +140,9 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) { final int[] numMessages = {1, 10, 100}; for(int i = 0; i < 5; i++) { - final long logIndex = getLogIndex(writeClient) + 1; final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)]; final TestParameters p = new TestParameters( - logIndex, n, writeClient, watchMajorityClient, watchAllClient, + n, writeClient, watchMajorityClient, watchAllClient, watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG); LOG.info("{}) {}, {}", i, p, cluster.printServers()); testCase.apply(p); @@ -131,18 +151,29 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } - static Void runTestWatchRequestAsync(TestParameters p) throws Exception { - runTestWatchRequestAsync(p.startLogIndex, p.numMessages, - p.writeClient, p.watchMajorityClient, p.watchAllClient, - p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); - return null; + static class WatchReplies { + private final long logIndex; + private final CompletableFuture<RaftClientReply> majority; + private final CompletableFuture<RaftClientReply> all; + private final CompletableFuture<RaftClientReply> majorityCommitted; + private final CompletableFuture<RaftClientReply> allCommitted; + + WatchReplies(long logIndex, + CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply> all, + CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted) { + this.logIndex = logIndex; + this.majority = majority; + this.all = all; + this.majorityCommitted = majorityCommitted; + this.allCommitted = allCommitted; + } } - static void runTestWatchRequestAsync( - long startLogIndex, int numMessages, - RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, - RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, - MiniRaftCluster cluster, Logger LOG) throws Exception { + static Void runTestWatchRequestAsync(TestParameters p) throws Exception { + final Logger LOG = p.log; + final MiniRaftCluster cluster = p.cluster; + final int numMessages = p.numMessages; + // blockStartTransaction of the leader so that no transaction can be committed MAJORITY final RaftServerImpl leader = cluster.getLeader(); LOG.info("block leader {}", leader.getId()); @@ -156,52 +187,35 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> // send a message final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); + final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>(); - for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; - final String message = "m" + logIndex; - LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); - replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); - watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); - watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); - watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync( - logIndex, ReplicationLevel.MAJORITY_COMMITTED)); - watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); - } + p.sendRequests(replies, watches); Assert.assertEquals(numMessages, replies.size()); - Assert.assertEquals(numMessages, watchMajoritys.size()); - Assert.assertEquals(numMessages, watchAlls.size()); - Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); - Assert.assertEquals(numMessages, watchAllCommitteds.size()); + Assert.assertEquals(numMessages, watches.size()); // since leader is blocked, nothing can be done. TimeUnit.SECONDS.sleep(1); assertNotDone(replies); - assertNotDone(watchMajoritys); - assertNotDone(watchAlls); - assertNotDone(watchMajorityCommitteds); - assertNotDone(watchAllCommitteds); + assertNotDone(watches); // unblock leader so that the transaction can be committed. SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); LOG.info("unblock leader {}", leader.getId()); for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; - LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex); final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final long logIndex = reply.getLogIndex(); + LOG.info("{}: receive reply for logIndex={}", i, logIndex); Assert.assertTrue(reply.isSuccess()); - Assert.assertEquals(logIndex, reply.getLogIndex()); - final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + + final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertEquals(logIndex, watchReplies.logIndex); + final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); - Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + Assert.assertTrue(watchMajorityReply.isSuccess()); final RaftClientReply watchMajorityCommittedReply - = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos @@ -219,22 +233,25 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } + Assert.assertEquals(numMessages, watches.size()); + // but not replicated/committed to all. TimeUnit.SECONDS.sleep(1); - assertNotDone(watchAlls); - assertNotDone(watchAllCommitteds); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); // unblock follower so that the transaction can be replicated and committed to all. LOG.info("unblock follower {}", blockedFollower.getId()); SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; + final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final long logIndex = watchReplies.logIndex; LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); Assert.assertTrue(watchAllReply.isSuccess()); - final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos @@ -243,9 +260,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); } } + return null; } static <T> void assertNotDone(List<CompletableFuture<T>> futures) { + assertNotDone(futures.stream()); + } + + static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) { futures.forEach(f -> { if (f.isDone()) { try { @@ -267,65 +289,44 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { - runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages, - p.writeClient, p.watchMajorityClient, p.watchAllClient, - p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); - return null; - } + final Logger LOG = p.log; + final MiniRaftCluster cluster = p.cluster; + final int numMessages = p.numMessages; - static void runTestWatchRequestAsyncChangeLeader( - long startLogIndex, int numMessages, - RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, - RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, - MiniRaftCluster cluster, Logger LOG) throws Exception { // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED final List<RaftServerImpl> followers = cluster.getFollowers(); final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size())); LOG.info("block follower {}", blockedFollower.getId()); SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData(); - // send a message final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); - final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); + final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>(); - for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; - final String message = "m" + logIndex; - LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); - replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); - watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); - watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); - watchMajorityCommitteds.add( - watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED)); - watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); - } + p.sendRequests(replies, watches); Assert.assertEquals(numMessages, replies.size()); - Assert.assertEquals(numMessages, watchMajoritys.size()); - Assert.assertEquals(numMessages, watchAlls.size()); - Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); - Assert.assertEquals(numMessages, watchAllCommitteds.size()); + Assert.assertEquals(numMessages, watches.size()); // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED. for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; - LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex); final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final long logIndex = reply.getLogIndex(); + LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex); Assert.assertTrue(reply.isSuccess()); - Assert.assertEquals(logIndex, reply.getLogIndex()); - final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + + final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertEquals(logIndex, watchReplies.logIndex); + final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); - Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + Assert.assertTrue(watchMajorityReply.isSuccess()); final RaftClientReply watchMajorityCommittedReply - = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); + LOG.info("commitInfos=" + commitInfos); Assert.assertEquals(NUM_SERVERS, commitInfos.size()); // One follower has not committed, so min must be less than logIndex @@ -339,8 +340,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } TimeUnit.SECONDS.sleep(1); - assertNotDone(watchAlls); - assertNotDone(watchAllCommitteds); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); // Now change leader RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId()); @@ -349,13 +350,14 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); LOG.info("unblock follower {}", blockedFollower.getId()); for(int i = 0; i < numMessages; i++) { - final long logIndex = startLogIndex + i; + final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final long logIndex = watchReplies.logIndex; LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); Assert.assertTrue(watchAllReply.isSuccess()); - final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos @@ -364,6 +366,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); } } + return null; } - } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java new file mode 100644 index 0000000..5353caa --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server; + +import org.apache.log4j.Level; +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerState; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.SizeInBytes; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Test restarting raft peers. + */ +public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + } + + static final int NUM_SERVERS = 3; + + @Before + public void setup() { + final RaftProperties prop = getProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB")); + } + + @Test + public void testRestartFollower() throws Exception { + try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) { + runTestRestartFollower(cluster, LOG); + } + } + + static void runTestRestartFollower(MiniRaftCluster cluster, Logger LOG) throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient(leaderId); + + // write some messages + final byte[] content = new byte[1024]; + Arrays.fill(content, (byte)1); + final SimpleMessage message = new SimpleMessage(new String(content)); + for(int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + + // restart a follower + RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + LOG.info("Restart follower {}", followerId); + cluster.restartServer(followerId, false); + + // write some more messages + for(int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + client.close(); + + final long leaderLastIndex = cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex(); + // make sure the restarted follower can catchup + final ServerState followerState = cluster.getRaftServerImpl(followerId).getState(); + JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >= leaderLastIndex, + 10, 500, "follower catchup", LOG); + + // make sure the restarted peer's log segments is correct + cluster.restartServer(followerId, false); + Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog() + .getLastEntryTermIndex().getIndex() >= leaderLastIndex); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java index 978800d..9cc60a6 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java @@ -20,61 +20,67 @@ package org.apache.ratis.server; import com.codahale.metrics.Timer; import org.apache.log4j.Level; +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.RatisMetricsRegistry; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.server.storage.RaftStorageTestUtils; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.util.LogUtils; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import javax.management.ObjectName; -import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; -public class TestRaftLogMetrics { +public class TestRaftLogMetrics extends BaseTest + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public static final int NUM_SERVERS = 3; - protected static final RaftProperties properties = new RaftProperties(); - - private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc - .FACTORY.newCluster(NUM_SERVERS, getProperties()); - - public RaftProperties getProperties() { - return properties; + { + getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + MetricsStateMachine.class, StateMachine.class); } - @Before - public void setup() throws IOException { - Assert.assertNull(cluster.getLeader()); - cluster.start(); - } + static class MetricsStateMachine extends BaseStateMachine { + static MetricsStateMachine get(RaftServerImpl s) { + return (MetricsStateMachine)s.getStateMachine(); + } + + private final AtomicInteger flushCount = new AtomicInteger(); - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); + int getFlushCount() { + return flushCount.get(); } - } - private String getLogFlushTimeMetric(String serverId) { - return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.") - .append(serverId).append(".flush-time").toString(); + @Override + public CompletableFuture<Void> flushStateMachineData(long index) { + flushCount.incrementAndGet(); + return super.flushStateMachineData(index); + } } @Test public void testFlushMetric() throws Exception { + try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestFlushMetric(cluster); + } + } + + static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception { int numMsg = 2; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg); @@ -85,22 +91,21 @@ public class TestRaftLogMetrics { } for (RaftServerProxy rsp: cluster.getServers()) { - String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString()); + final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId()); Timer tm = RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric); Assert.assertNotNull(tm); - // Number of log entries expected = numMsg + 1 entry for start-log-segment - int numExpectedLogEntries = numMsg + 1; + final MetricsStateMachine stateMachine = MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId())); + final int expectedFlush = stateMachine.getFlushCount(); - Assert.assertEquals(numExpectedLogEntries, tm.getCount()); + Assert.assertEquals(expectedFlush, tm.getCount()); Assert.assertTrue(tm.getMeanRate() > 0); // Test jmx ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric); - Assert.assertEquals(numExpectedLogEntries, + Assert.assertEquals(expectedFlush, ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count")) .intValue()); } } - } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index e9651d6..246a9a2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -61,8 +61,6 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { protected static final RaftProperties prop = new RaftProperties(); - private static final ClientId clientId = ClientId.randomId(); - static final int STAGING_CATCHUP_GAP = 10; @BeforeClass public static void setup() { @@ -416,17 +414,16 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { cluster.start(); RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); + final RaftServerImpl leader = cluster.getLeader(); + final RaftClient client = cluster.createClient(leader.getId()); client.send(new SimpleMessage("m")); - final long committedIndex = cluster.getLeader().getState().getLog() - .getLastCommittedIndex(); + final RaftLog leaderLog = leader.getState().getLog(); + final long committedIndex = leaderLog.getLastCommittedIndex(); final RaftConfiguration confBefore = cluster.getLeader().getRaftConf(); // no real configuration change in the request - RaftClientReply reply = client.setConfiguration(cluster.getPeers() - .toArray(new RaftPeer[0])); + final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray())); Assert.assertTrue(reply.isSuccess()); Assert.assertEquals(committedIndex, cluster.getLeader().getState() .getLog().getLastCommittedIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index bcfaf01..827117e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -41,8 +41,7 @@ public class RaftServerTestUtil { 3, sleepMs, "waitAndCheckNewConf", LOG); } private static void waitAndCheckNewConf(MiniRaftCluster cluster, - RaftPeer[] peers, Collection<String> deadPeers) - throws Exception { + RaftPeer[] peers, Collection<String> deadPeers) { LOG.info(cluster.printServers()); Assert.assertNotNull(cluster.getLeader()); @@ -61,9 +60,11 @@ public class RaftServerTestUtil { numIncluded++; Assert.assertTrue(server.getRaftConf().isStable()); Assert.assertTrue(server.getRaftConf().hasNoChange(peers)); - } else { - Assert.assertFalse(server.getId() + " is still running: " + server, - server.isAlive()); + } else if (server.isAlive()) { + // The server is successfully removed from the conf + // It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf). + Assert.assertTrue(server.getRaftConf().isStable()); + Assert.assertFalse(server.getRaftConf().containsInConf(server.getId())); } } Assert.assertEquals(peers.length, numIncluded + deadIncluded); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java index ec635d0..cf3a490 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java @@ -21,6 +21,7 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.RaftProperties; @@ -107,8 +108,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; final long seqNum = 111; - RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, - callId, seqNum, new RaftTestUtil.SimpleMessage("message")); + final SimpleMessage message = new SimpleMessage("message"); + final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message); RaftClientReply reply = rpc.sendRequest(r); Assert.assertFalse(reply.isSuccess()); Assert.assertNotNull(reply.getStateMachineException()); @@ -131,8 +132,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu } Assert.assertNotNull( RaftServerTestUtil.getRetryEntry(server, client.getId(), callId)); - Assert.assertEquals(oldLastApplied + 1, - server.getState().getLastAppliedIndex()); + final RaftLog log = server.getState().getLog(); + RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message); } client.close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java index a66cf70..e566700 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java @@ -97,7 +97,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster> RaftClientReply watchReply = client.sendWatch( logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED); watchReply.getCommitInfos().forEach( - val -> Assert.assertEquals(val.getCommitIndex(), logIndex)); + val -> Assert.assertTrue(val.getCommitIndex() >= logIndex)); RaftServerImpl secondFollower = cluster.getFollowers().get(1); // Second follower is blocked in apply transaction http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java new file mode 100644 index 0000000..306e5e7 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.server.ServerRestartTests; + +public class TestServerRestartWithSimulatedRpc + extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java index ad8308e..e681b66 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java @@ -18,6 +18,8 @@ package org.apache.ratis.server.storage; import org.apache.log4j.Level; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.AutoCloseableLock; @@ -30,6 +32,10 @@ public interface RaftStorageTestUtils { LogUtils.setLogLevel(RaftLogWorker.LOG, level); } + static String getLogFlushTimeMetric(RaftPeerId serverId) { + return RaftLogWorker.class.getName() + "." + serverId + ".flush-time"; + } + static void printLog(RaftLog log, Consumer<String> println) { if (log == null) { println.accept("log == null"); @@ -50,8 +56,7 @@ public interface RaftStorageTestUtils { b.append(i == committed? 'c': ' '); b.append(String.format("%3d: ", i)); try { - final RaftProtos.LogEntryProto entry = log.get(i); - b.append(entry != null? entry.getLogEntryBodyCase(): null); + b.append(ServerProtoUtils.toLogEntryString(log.get(i))); } catch (RaftLogIOException e) { b.append(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 7a326a3..0a5e38d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -33,7 +33,9 @@ import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftStorageDirectory; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.junit.After; import org.junit.Assert; @@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; public abstract class RaftSnapshotBaseTest extends BaseTest { static { @@ -56,25 +60,31 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class); private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10; - static File getSnapshotFile(MiniRaftCluster cluster, int i) { + static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) { final RaftServerImpl leader = cluster.getLeader(); - final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader); - return sm.getStateMachineStorage().getSnapshotFile( - leader.getState().getCurrentTerm(), i); + final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage(); + final long term = leader.getState().getCurrentTerm(); + return LongStream.range(startIndex, endIndex) + .mapToObj(i -> storage.getSnapshotFile(term, i)) + .collect(Collectors.toList()); } - static void assertLeaderContent(MiniRaftCluster cluster) - throws InterruptedException { + + static void assertLeaderContent(MiniRaftCluster cluster) throws Exception { final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); - Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2, - leader.getState().getLog().getLastCommittedIndex()); - final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); + final RaftLog leaderLog = leader.getState().getLog(); + final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex(); + final LogEntryProto e = leaderLog.get(lastIndex); - for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { - Assert.assertEquals(i+1, entries[i].getIndex()); - Assert.assertArrayEquals( - new SimpleMessage("m" + i).getContent().toByteArray(), - entries[i].getStateMachineLogEntry().getLogData().toByteArray()); + final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent(); + long message = 0; + for (int i = 0; i < entries.length; i++) { + LOG.info("{}) {} {}", i, message, entries[i]); + if (entries[i].hasStateMachineLogEntry()) { + final SimpleMessage m = new SimpleMessage("m" + message++); + Assert.assertArrayEquals(m.getContent().toByteArray(), + entries[i].getStateMachineLogEntry().getLogData().toByteArray()); + } } } @@ -118,15 +128,12 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { } } + final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex(); + LOG.info("nextIndex = {}", nextIndex); // wait for the snapshot to be done - final File snapshotFile = getSnapshotFile(cluster, i); - - int retries = 0; - do { - Thread.sleep(1000); - } while (!snapshotFile.exists() && retries++ < 10); - - Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); + final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); + JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists), + 10, 1000, "snapshotFile.exist", LOG); // restart the peer and check if it can correctly load snapshot cluster.restart(false); @@ -138,6 +145,14 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { } } + static boolean exists(File f) { + if (f.exists()) { + LOG.info("File exists: " + f); + return true; + } + return false; + } + /** * Basic test for install snapshot: start a one node cluster and let it * generate a snapshot. Then delete the log and restart the node, and add more @@ -145,7 +160,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { */ @Test public void testBasicInstallSnapshot() throws Exception { - List<LogPathAndIndex> logs; + final List<LogPathAndIndex> logs; try { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); @@ -161,15 +176,13 @@ public abstract class RaftSnapshotBaseTest extends BaseTest { // wait for the snapshot to be done RaftStorageDirectory storageDirectory = cluster.getLeader().getState() .getStorage().getStorageDir(); - final File snapshotFile = getSnapshotFile(cluster, i); - logs = storageDirectory.getLogSegmentFiles(); - - int retries = 0; - do { - Thread.sleep(1000); - } while (!snapshotFile.exists() && retries++ < 10); - Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists()); + final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex(); + LOG.info("nextIndex = {}", nextIndex); + final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); + JavaUtils.attempt(() -> snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists), + 10, 1000, "snapshotFile.exist", LOG); + logs = storageDirectory.getLogSegmentFiles(); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cce03d04/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 9a7267b..313e713 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -20,13 +20,15 @@ package org.apache.ratis.statemachine; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.io.MD5Hash; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -34,22 +36,29 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogInputStream; import org.apache.ratis.server.storage.LogOutputStream; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.statemachine.impl.TransactionContextImpl; -import org.apache.ratis.util.*; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MD5FileUtil; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * A {@link StateMachine} implementation example that simply stores all the log @@ -68,8 +77,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return (SimpleStateMachine4Testing)s.getStateMachine(); } - private final List<LogEntryProto> list = - Collections.synchronizedList(new ArrayList<>()); + private final SortedMap<Long, LogEntryProto> indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final SortedMap<String, LogEntryProto> dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); private final Daemon checkpointer; private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final RaftProperties properties = new RaftProperties(); @@ -119,14 +128,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { public SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { while (running) { - if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >= - SNAPSHOT_THRESHOLD) { - endIndexLastCkpt = takeSnapshot(); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) { - } + if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) { + endIndexLastCkpt = takeSnapshot(); + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch(InterruptedException ignored) { + } } }); } @@ -139,6 +148,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return leaderElectionTimeoutInfo; } + private void put(LogEntryProto entry) { + final LogEntryProto previous = indexMap.put(entry.getIndex(), entry); + Preconditions.assertNull(previous, "previous"); + dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), entry); + } + @Override public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { @@ -171,7 +186,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry()); - list.add(entry); + put(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); return CompletableFuture.completedFuture( new SimpleMessage(entry.getIndex() + " OK")); @@ -192,7 +207,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { termIndex.getIndex(), snapshotFile); try (LogOutputStream out = new LogOutputStream(snapshotFile, false, segmentMaxSize, preallocatedSize, bufferSize)) { - for (final LogEntryProto entry : list) { + for (final LogEntryProto entry : indexMap.values()) { if (entry.getIndex() > endIndex) { break; } else { @@ -241,13 +256,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { - list.add(entry); + put(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); } } Preconditions.assertTrue( - !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(), - "endIndex=%s, list=%s", endIndex, list); + !indexMap.isEmpty() && endIndex == indexMap.lastKey(), + "endIndex=%s, indexMap=%s", endIndex, indexMap); this.endIndexLastCkpt = endIndex; setLastAppliedTermIndex(snapshot.getTermIndex()); this.storage.loadLatestSnapshot(); @@ -264,18 +279,21 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { */ @Override public CompletableFuture<Message> query(Message request) { - final ByteString bytes = request.getContent(); + final String string = request.getContent().toStringUtf8(); + Exception exception; try { - final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex() - : Long.parseLong(bytes.toStringUtf8()); - LOG.info("query log index " + index); - final LogEntryProto entry = list.get(Math.toIntExact(index - 1)); - return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); + LOG.info("query " + string); + final LogEntryProto entry = dataMap.get(string); + if (entry != null) { + return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); + } + exception = new IndexOutOfBoundsException("Log entry not found for query " + string); } catch (Exception e) { LOG.warn("Failed request " + request, e); - return JavaUtils.completeExceptionally(new StateMachineException( - "Failed request " + request, e)); + exception = e; } + return JavaUtils.completeExceptionally(new StateMachineException( + "Failed request " + request, exception)); } static final ByteString STATE_MACHINE_DATA = ByteString.copyFromUtf8("StateMachine Data"); @@ -314,7 +332,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } public LogEntryProto[] getContent() { - return list.toArray(new LogEntryProto[list.size()]); + return indexMap.values().toArray(new LogEntryProto[0]); } public void blockStartTransaction() {
