Repository: incubator-ratis Updated Branches: refs/heads/master 9b2d7b65c -> 940a169ba
RATIS-347. Follow up works for RATIS-234. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/940a169b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/940a169b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/940a169b Branch: refs/heads/master Commit: 940a169bac12b10eb636d62029dddbc9fad420ff Parents: 9b2d7b6 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Oct 12 11:16:48 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Oct 12 11:16:48 2018 +0800 ---------------------------------------------------------------------- .../apache/ratis/protocol/RaftClientReply.java | 8 +- .../java/org/apache/ratis/util/LongMinMax.java | 76 ++++++++ .../java/org/apache/ratis/util/TestMinMax.java | 57 ++++++ .../apache/ratis/server/impl/LeaderState.java | 28 +-- .../org/apache/ratis/server/impl/RoleInfo.java | 14 +- .../apache/ratis/server/storage/RaftLog.java | 9 +- .../ratis/server/storage/RaftLogWorker.java | 1 + .../org/apache/ratis/WatchRequestTests.java | 178 +++++++++++++++---- 8 files changed, 317 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 4c290ff..0ec9f75 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -18,6 +18,7 @@ package org.apache.ratis.protocol; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; @@ -25,8 +26,6 @@ import org.apache.ratis.util.ReflectionUtils; import java.util.Collection; import java.util.Collections; -import java.util.stream.Collector; -import java.util.stream.Collectors; /** * Reply from server to client @@ -43,6 +42,11 @@ public class RaftClientReply extends RaftClientMessage { private final RaftException exception; private final Message message; + /** + * This field is the log index of the transaction + * if (1) the request is {@link RaftClientRequestProto.TypeCase#WRITE} and (2) the reply is success. + * Otherwise, this field is not used. + */ private final long logIndex; /** The commit information when the reply is created. */ private final Collection<CommitInfoProto> commitInfos; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java new file mode 100644 index 0000000..47bc00d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +/** + * Min and max values in long. + * + * This class is mutable. + * This class is NOT thread safe. + */ +public class LongMinMax { + private long min; + private long max; + private boolean initialized = false; + + /** @return the min */ + public long getMin() { + Preconditions.assertTrue(initialized, "This LongMinMax object is uninitialized."); + return min; + } + + /** @return the max */ + public long getMax() { + Preconditions.assertTrue(initialized, "This LongMinMax object is uninitialized."); + return max; + } + + public boolean isInitialized() { + return initialized; + } + + /** Update min and max with the given number. */ + public void accumulate(long n) { + if (!initialized) { + min = max = n; + initialized = true; + } else if (n < min) { + min = n; + } else if (n > max) { + max = n; + } + } + + /** Combine that to this. */ + public void combine(LongMinMax that) { + if (that.initialized) { + if (!this.initialized) { + this.min = that.min; + this.max = that.max; + this.initialized = true; + } else { + if (that.min < this.min) { + this.min = that.min; + } + if (that.max > this.max) { + this.max = that.max; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java new file mode 100644 index 0000000..8d315b7 --- /dev/null +++ b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.LongStream; + +public class TestMinMax { + @Test(timeout = 1000) + public void testMinMax() { + runTestMinMax(LongStream.empty()); + runTestMinMax(LongStream.iterate(0, n -> n).limit(10)); + for(int count = 1; count < 10; count++) { + runTestMinMax(LongStream.iterate(1, n -> n + 1).limit(count)); + } + for(int count = 1; count < 10; count++) { + runTestMinMax(LongStream.iterate(0, _dummy -> ThreadLocalRandom.current().nextLong()).limit(count)); + } + } + + static void runTestMinMax(LongStream stream) { + final List<Long> list = stream.collect(ArrayList::new, List::add, List::addAll); + final LongMinMax longMinMax = toLongStream(list).collect(LongMinMax::new, LongMinMax::accumulate, LongMinMax::combine); + if (longMinMax.isInitialized()) { + Assert.assertEquals(toLongStream(list).min().getAsLong(), longMinMax.getMin()); + Assert.assertEquals(toLongStream(list).max().getAsLong(), longMinMax.getMax()); + } else { + Assert.assertEquals(OptionalLong.empty(), toLongStream(list).min()); + Assert.assertEquals(OptionalLong.empty(), toLongStream(list).max()); + } + } + + static LongStream toLongStream(List<Long> list) { + return list.stream().mapToLong(Long::longValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 48f0c1c..c647530 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -139,6 +139,10 @@ public class LeaderState { this.senders = new CopyOnWriteArrayList<>(senders); } + int size() { + return senders.size(); + } + Stream<LogAppender> stream() { return senders.stream(); } @@ -300,16 +304,15 @@ public class LeaderState { } void commitIndexChanged() { - final long leader = raftLog.getLastCommittedIndex(); - final long min = senders.stream() + final LongMinMax minMax = senders.stream() .map(LogAppender::getFollower) - .map(FollowerInfo::getCommitIndex) - .min(Long::compare) - .orElse(leader); // it happens only if senders.isEmpty() - Preconditions.assertTrue(leader >= min); // leader commit index should always be ahead followers - - watchRequests.update(ReplicationLevel.MAJORITY, leader); - watchRequests.update(ReplicationLevel.ALL_COMMITTED, min); + .mapToLong(FollowerInfo::getCommitIndex) + .collect(LongMinMax::new, LongMinMax::accumulate, LongMinMax::combine); + minMax.accumulate(raftLog.getLastCommittedIndex()); + // Normally, leader commit index is always ahead followers. + // However, after a leader change, the new leader commit index may be behind some followers in the beginning. + watchRequests.update(ReplicationLevel.MAJORITY, minMax.getMax()); + watchRequests.update(ReplicationLevel.ALL_COMMITTED, minMax.getMin()); } private void applyOldNewConf() { @@ -509,7 +512,7 @@ public class LeaderState { return; } - final long[] indicesInNewConf = computeCommittedIndices(followers, includeSelf); + final long[] indicesInNewConf = getSortedLogIndices(followers, includeSelf); final long majorityInNewConf = getMajority(indicesInNewConf); final long majority; final long min; @@ -524,7 +527,7 @@ public class LeaderState { return; } - final long[] indicesInOldConf = computeCommittedIndices(oldFollowers, includeSelfInOldConf); + final long[] indicesInOldConf = getSortedLogIndices(oldFollowers, includeSelfInOldConf); final long majorityInOldConf = getMajority(indicesInOldConf); majority = Math.min(majorityInNewConf, majorityInOldConf); min = Math.min(indicesInNewConf[0], indicesInOldConf[0]); @@ -537,6 +540,7 @@ public class LeaderState { final TermIndex[] entriesToCommit = raftLog.getEntries( oldLastCommitted + 1, majority + 1); if (server.getState().updateStatemachine(majority, currentTerm)) { + watchRequests.update(ReplicationLevel.MAJORITY, majority); commitIndexChanged(); } checkAndUpdateConfiguration(entriesToCommit); @@ -604,7 +608,7 @@ public class LeaderState { return indices[(indices.length - 1) / 2]; } - private long[] computeCommittedIndices(List<FollowerInfo> followers, boolean includeSelf) { + private long[] getSortedLogIndices(List<FollowerInfo> followers, boolean includeSelf) { final int length = includeSelf ? followers.size() + 1 : followers.size(); if (length == 0) { throw new IllegalArgumentException("followers.size() == " http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index eb25c71..42635f9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * Maintain the Role of a Raft Peer. */ -public class RoleInfo { +class RoleInfo { public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class); private final RaftPeerId id; @@ -53,28 +53,28 @@ public class RoleInfo { this.transitionTime = new AtomicReference<>(new Timestamp()); } - public void transitionRole(RaftPeerRole newRole) { + void transitionRole(RaftPeerRole newRole) { this.role = newRole; this.transitionTime.set(new Timestamp()); } - public long getRoleElapsedTimeMs() { + long getRoleElapsedTimeMs() { return transitionTime.get().elapsedTimeMs(); } - public RaftPeerRole getCurrentRole() { + RaftPeerRole getCurrentRole() { return role; } - public boolean isFollower() { + boolean isFollower() { return role == RaftPeerRole.FOLLOWER; } - public boolean isCandidate() { + boolean isCandidate() { return role == RaftPeerRole.CANDIDATE; } - public boolean isLeader() { + boolean isLeader() { return role == RaftPeerRole.LEADER; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index b5a38fd..c1b05f6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -89,14 +89,17 @@ public abstract class RaftLog implements Closeable { */ public boolean updateLastCommitted(long majorityIndex, long currentTerm) { try(AutoCloseableLock writeLock = writeLock()) { - if (lastCommitted.get() < majorityIndex) { + final long oldCommittedIndex = lastCommitted.get(); + if (oldCommittedIndex < majorityIndex) { // Only update last committed index for current term. See §5.4.2 in // paper for details. final TermIndex entry = getTermIndex(majorityIndex); if (entry != null && entry.getTerm() == currentTerm) { final long commitIndex = Math.min(majorityIndex, getLatestFlushedIndex()); - LOG.debug("{}: Updating lastCommitted to {}", selfId, commitIndex); - lastCommitted.set(commitIndex); + if (commitIndex > oldCommittedIndex) { + LOG.debug("{}: updateLastCommitted {} -> {}", selfId, oldCommittedIndex, commitIndex); + lastCommitted.set(commitIndex); + } return true; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 715370b..30df276 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -238,6 +238,7 @@ class RaftLogWorker implements Runnable { } private void updateFlushedIndex() { + LOG.debug("{}: updateFlushedIndex {} -> {}", name, lastWrittenIndex, flushedIndex); flushedIndex = lastWrittenIndex; pendingFlushNum = 0; submitUpdateCommitEvent.run(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index 5014150..bb89bc0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.CheckedFunction; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; @@ -66,49 +66,74 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> LOG.info("Running testWatchRequests"); try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { cluster.start(); - runTestWatchRequestAsync(cluster, LOG); + runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG); } } - static void runTestWatchRequestAsync(MiniRaftCluster cluster, Logger LOG) throws Exception { + static class TestParameters { + final long startLogIndex; + final int numMessages; + final RaftClient writeClient; + final RaftClient watchMajorityClient; + final RaftClient watchAllClient; + final RaftClient watchAllCommittedClient; + final MiniRaftCluster cluster; + final Logger log; + + TestParameters(long startLogIndex, int numMessages, RaftClient writeClient, + RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + MiniRaftCluster cluster, Logger log) { + this.startLogIndex = startLogIndex; + this.numMessages = numMessages; + this.writeClient = writeClient; + this.watchMajorityClient = watchMajorityClient; + this.watchAllClient = watchAllClient; + this.watchAllCommittedClient = watchAllCommittedClient; + this.cluster = cluster; + this.log = log; + } + + @Override + public String toString() { + return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages; + } + } + + static long getLogIndex(RaftClient writeClient) throws Exception { + // send a message in order to get the current log index + final RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage("getLogIndex"); + final RaftClientReply reply = writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertTrue(reply.isSuccess()); + return reply.getLogIndex(); + } + + static void runTest(CheckedFunction<TestParameters, Void, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception { try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) { - long logIndex; - { - // send the first message - final RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage("message"); - final RaftClientReply reply = writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - Assert.assertTrue(reply.isSuccess()); - logIndex = reply.getLogIndex(); - - final List<CompletableFuture<Void>> futures = new ArrayList<>(); - futures.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY) - .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); - futures.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL) - .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); - futures.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED) - .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); - JavaUtils.allOf(futures).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - } - logIndex++; - + final int[] numMessages = {1, 10, 100}; for(int i = 0; i < 5; i++) { - final int numMessages = ThreadLocalRandom.current().nextInt(10) + 1; - runTestWatchRequestAsync(logIndex, numMessages, writeClient, watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG); - logIndex += numMessages; + final long logIndex = getLogIndex(writeClient) + 1; + final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)]; + final TestParameters p = new TestParameters( + logIndex, n, writeClient, watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG); + LOG.info("{}) {}, {}", i, p, cluster.printServers()); + testCase.apply(p); } - LOG.info(cluster.printServers()); } } + static Void runTestWatchRequestAsync(TestParameters p) throws Exception { + runTestWatchRequestAsync(p.startLogIndex, p.numMessages, + p.writeClient, p.watchMajorityClient, p.watchAllClient, p.watchAllCommittedClient, p.cluster, p.log); + return null; + } + static void runTestWatchRequestAsync(long startLogIndex, int numMessages, RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger LOG) throws Exception { - LOG.info("runTestWatchRequestAsync: startLogIndex={}, numMessages={}", startLogIndex, numMessages); - // blockStartTransaction of the leader so that no transaction can be committed MAJORITY final RaftServerImpl leader = cluster.getLeader(); LOG.info("block leader {}", leader.getId()); @@ -177,7 +202,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> Assert.assertTrue(watchAllReply.isSuccess()); final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchAllCommittedReply({}) = ", logIndex, watchAllCommittedReply); + LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); @@ -188,6 +213,99 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } static <T> void assertNotDone(List<CompletableFuture<T>> futures) { - futures.forEach(f -> Assert.assertFalse(f.isDone())); + futures.forEach(f -> { + if (f.isDone()) { + try { + Assert.fail("Done unexpectedly: " + f.get()); + } catch(Exception e) { + Assert.fail("Done unexpectedly and failed to get: " + e); + } + } + }); + } + + @Test + public void testWatchRequestAsyncChangeLeader() throws Exception { + LOG.info("Running testWatchRequestAsyncChangeLeader"); + try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster, LOG); + } + } + + static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { + runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages, + p.writeClient, p.watchMajorityClient, p.watchAllClient, p.watchAllCommittedClient, p.cluster, p.log); + return null; + } + + static void runTestWatchRequestAsyncChangeLeader(long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + MiniRaftCluster cluster, Logger LOG) throws Exception { + // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED + final List<RaftServerImpl> followers = cluster.getFollowers(); + final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size())); + LOG.info("block follower {}", blockedFollower.getId()); + SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData(); + + // send a message + final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); + + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + final String message = "m" + logIndex; + LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); + replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); + watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); + watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); + } + + Assert.assertEquals(numMessages, replies.size()); + Assert.assertEquals(numMessages, watchMajoritys.size()); + Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchAllCommitteds.size()); + + // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED. + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex); + final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(logIndex, reply.getLogIndex()); + final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); + Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + } + TimeUnit.SECONDS.sleep(1); + assertNotDone(watchAlls); + assertNotDone(watchAllCommitteds); + + // Now change leader + RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId()); + + // unblock follower so that the transaction can be replicated and committed to all. + SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); + LOG.info("unblock follower {}", blockedFollower.getId()); + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); + final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); + Assert.assertTrue(watchAllReply.isSuccess()); + + final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); + Assert.assertTrue(watchAllCommittedReply.isSuccess()); + { // check commit infos + final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); + Assert.assertEquals(NUM_SERVERS, commitInfos.size()); + commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); + } + } } + }
