Repository: incubator-ratis Updated Branches: refs/heads/master a0843f4c2 -> e35d954e3
RATIS-327. Fix bugs in SimpleStateMachine4Testing. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e35d954e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e35d954e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e35d954e Branch: refs/heads/master Commit: e35d954e34adb4417dcc96c563df79b559768897 Parents: a0843f4 Author: Lokesh Jain <[email protected]> Authored: Tue Oct 2 15:58:05 2018 +0530 Committer: Lokesh Jain <[email protected]> Committed: Tue Oct 2 15:58:05 2018 +0530 ---------------------------------------------------------------------- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 26 ++--- .../apache/ratis/server/impl/LeaderState.java | 1 - .../ratis/server/impl/PendingRequests.java | 7 +- .../java/org/apache/ratis/MiniRaftCluster.java | 2 +- .../java/org/apache/ratis/RaftAsyncTests.java | 46 ++++---- .../TestRaftServerLeaderElectionTimeout.java | 2 +- .../ratis/TestRaftServerSlownessDetection.java | 7 +- .../SimpleStateMachine4Testing.java | 108 +++++++++---------- 8 files changed, 90 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 2d0af07..7ae385d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -68,28 +68,22 @@ public class TestRaftWithGrpc long waitTime = 5000; try (final RaftClient client = cluster.createClient()) { // block append requests - cluster.getServerAliveStream().forEach(raftServer -> { - try { - if (!raftServer.isLeader()) { - ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(true); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while blocking append", e); - } - }); + cluster.getServerAliveStream() + .filter(impl -> !impl.isLeader()) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData); + CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc")); Thread.sleep(waitTime); // replyFuture should not be completed until append request is unblocked. Assert.assertTrue(!replyFuture.isDone()); // unblock append request. - cluster.getServerAliveStream().forEach(raftServer -> { - try { - ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(false); - } catch (InterruptedException e) { - LOG.error("Interrupted while unblocking append", e); - } - }); + cluster.getServerAliveStream() + .filter(impl -> !impl.isLeader()) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData); + long index = cluster.getLeader().getState().getLog().getNextIndex(); TermIndex[] leaderEntries = cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE); // The entries have been appended in the followers http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index e355d4f..be54346 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -494,7 +494,6 @@ public class LeaderState { if (conf.isTransitional()) { replicateNewConf(); } else { // the (new) log entry has been committed - LOG.debug("{} sends success to setConfiguration request", server.getId()); pendingRequests.replySetConfiguration(); // if the leader is not included in the current configuration, step down if (!conf.containsInConf(server.getId())) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index c84f944..9a4ed74 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -174,9 +174,11 @@ class PendingRequests { // commits the new configuration while it has not received the retry // request from the client if (pendingSetConf != null) { + final RaftClientRequest request = pendingSetConf.getRequest(); + LOG.debug("{}: sends success for {}", server.getId(), request); // for setConfiguration we do not need to wait for statemachine. send back // reply after it's committed. - pendingSetConf.setReply(new RaftClientReply(pendingSetConf.getRequest(), server.getCommitInfos())); + pendingSetConf.setReply(new RaftClientReply(request, server.getCommitInfos())); pendingSetConf = null; } } @@ -216,8 +218,7 @@ class PendingRequests { * requests since they have not got applied to the state machine yet. */ void sendNotLeaderResponses() throws IOException { - LOG.info("{} sends responses before shutting down PendingRequestsHandler", - server.getId()); + LOG.info("{}: sendNotLeaderResponses", server.getId()); // notify the state machine about stepping down final NotLeaderException nle = server.generateNotLeaderException(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 60f7050..7db7685 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -485,7 +485,7 @@ public abstract class MiniRaftCluster implements Closeable { return getRaftServerImpl(servers.get(id)); } - private RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) { + public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) { return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 776e1f1..26213e2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -25,8 +25,6 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; -import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; @@ -91,13 +89,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba } } - static void setBlockTransaction(boolean block, MiniRaftCluster cluster) throws InterruptedException { - for (RaftServerProxy server : cluster.getServers()) { - final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId()); - ((SimpleStateMachine4Testing)impl.getStateMachine()).setBlockTransaction(block); - } - } - @Test public void testAsyncRequestSemaphore() throws Exception { LOG.info("Running testAsyncRequestSemaphore"); @@ -111,7 +102,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages); final RaftClient client = cluster.createClient(); //Set blockTransaction flag so that transaction blocks - setBlockTransaction(true, cluster); + cluster.getServers().stream() + .map(cluster::getRaftServerImpl) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::blockStartTransaction); //Send numMessages which are blocked and do not release the client semaphore permits AtomicInteger blockedRequestsCount = new AtomicInteger(); @@ -139,7 +133,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1); //Unset the blockTransaction flag so that semaphore permits can be released - setBlockTransaction(false, cluster); + cluster.getServers().stream() + .map(cluster::getRaftServerImpl) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::unblockStartTransaction); + for(int i=0; i<=numMessages; i++){ futures[i].join(); } @@ -285,27 +283,21 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba long waitTime = 5000; try (final RaftClient client = cluster.createClient()) { // block append requests - cluster.getServerAliveStream().forEach(raftServer -> { - try { - if (!raftServer.isLeader()) { - ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(true); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while blocking append", e); - } - }); + cluster.getServerAliveStream() + .filter(impl -> !impl.isLeader()) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData); + CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc")); Thread.sleep(waitTime); // replyFuture should not be completed until append request is unblocked. Assert.assertTrue(!replyFuture.isDone()); // unblock append request. - cluster.getServerAliveStream().forEach(raftServer -> { - try { - ((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(false); - } catch (InterruptedException e) { - LOG.error("Interrupted while unblocking append", e); - } - }); + cluster.getServerAliveStream() + .filter(impl -> !impl.isLeader()) + .map(SimpleStateMachine4Testing::get) + .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData); + replyFuture.get(); Assert.assertTrue(System.currentTimeMillis() - time > waitTime); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java index 6ec321f..55bcdfc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java @@ -86,7 +86,7 @@ public class TestRaftServerLeaderElectionTimeout extends BaseTest { cluster.killServer(failedFollower.getId()); cluster.killServer(cluster.getLeader().getId()); - // Wait to ensure that leader election is trigerred and also state machine callback is triggered + // Wait to ensure that leader election is triggered and also state machine callback is triggered Thread.sleep( leaderElectionTimeout * 2); RaftProtos.RoleInfoProto roleInfoProto = http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java index b8ef87e..e109598 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -18,7 +18,6 @@ package org.apache.ratis; import org.apache.log4j.Level; -import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; @@ -45,7 +44,6 @@ import java.util.concurrent.TimeUnit; public class TestRaftServerSlownessDetection extends BaseTest { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } public static final int NUM_SERVERS = 3; @@ -100,8 +98,9 @@ public class TestRaftServerSlownessDetection extends BaseTest { roleInfoProto.getLeaderInfo().getFollowerInfoList(); //Assert that the node shutdown is lagging behind for (RaftProtos.ServerRpcProto serverProto : followers) { - Assert.assertTrue(!(RaftPeerId.valueOf(serverProto.getId().getId()) == failedFollower.getId()) || - serverProto.getLastRpcElapsedTimeMs() > slownessTimeout); + if (RaftPeerId.valueOf(serverProto.getId().getId()).equals(failedFollower.getId())) { + Assert.assertTrue(serverProto.getLastRpcElapsedTimeMs() > slownessTimeout); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 6285701..dcb899a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -49,7 +49,7 @@ import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ExecutionException; /** * A {@link StateMachine} implementation example that simply stores all the log @@ -81,12 +81,40 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); private volatile boolean running = true; - private volatile boolean blockTransaction = false; - private volatile boolean blockAppend = false; - private final Semaphore blockingSemaphore = new Semaphore(1); + + + static class Blocking { + enum Type { + START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA + } + + private final EnumMap<Type, CompletableFuture<Void>> maps = new EnumMap<>(Type.class); + + void block(Type type) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + final CompletableFuture<Void> previous = maps.putIfAbsent(type, future); + Preconditions.assertNull(previous, "previous"); + } + + void unblock(Type type) { + final CompletableFuture<Void> future = maps.remove(type); + Objects.requireNonNull(future, "future == null"); + future.complete(null); + } + + void await(Type type) { + try { + maps.getOrDefault(type, CompletableFuture.completedFuture(null)).get(); + } catch(InterruptedException | ExecutionException e) { + throw new IllegalStateException("Failed to await " + type, e); + } + } + } + + private final Blocking blocking = new Blocking(); private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; - private RoleInfoProto slownessInfo = null; - private RoleInfoProto leaderElectionTimeoutInfo = null; + private volatile RoleInfoProto slownessInfo = null; + private volatile RoleInfoProto leaderElectionTimeoutInfo = null; public SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { @@ -251,18 +279,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override - public TransactionContext startTransaction(RaftClientRequest request) - throws IOException { - if (blockTransaction) { - try { - //blocks until blockTransaction is set to false - blockingSemaphore.acquire(); - blockingSemaphore.release(); - } catch (InterruptedException e) { - LOG.error("Could not block applyTransaction", e); - Thread.currentThread().interrupt(); - } - } + public TransactionContext startTransaction(RaftClientRequest request) throws IOException { + blocking.await(Blocking.Type.START_TRANSACTION); return new TransactionContextImpl(this, request, SMLogEntryProto.newBuilder() .setData(request.getMessage().getContent()) .setStateMachineData(ByteString.copyFromUtf8("StateMachine Data")).build()); @@ -270,34 +288,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { - CompletableFuture<?> f = new CompletableFuture(); - if (blockAppend) { - try { - blockingSemaphore.acquire(); - blockingSemaphore.release(); - } catch (InterruptedException e) { - LOG.error("Could not block writeStateMachineData", e); - Thread.currentThread().interrupt(); - } - } - f.complete(null); - return f; + blocking.await(Blocking.Type.WRITE_STATE_MACHINE_DATA); + return CompletableFuture.completedFuture(null); } @Override public CompletableFuture<ByteString> readStateMachineData(LogEntryProto entry) { - CompletableFuture<ByteString> f = new CompletableFuture<>(); - if (blockAppend) { - try { - blockingSemaphore.acquire(); - blockingSemaphore.release(); - } catch (InterruptedException e) { - LOG.error("Could not block readStateMachineData", e); - Thread.currentThread().interrupt(); - } - } - f.complete(null); - return f; + blocking.await(Blocking.Type.READ_STATE_MACHINE_DATA); + return CompletableFuture.completedFuture(null); } @Override @@ -312,31 +310,29 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return list.toArray(new LogEntryProto[list.size()]); } - public void setBlockTransaction(boolean blockTransactionVal) throws InterruptedException { - this.blockTransaction = blockTransactionVal; - if (blockTransactionVal) { - blockingSemaphore.acquire(); - } else { - blockingSemaphore.release(); - } + public void blockStartTransaction() { + blocking.block(Blocking.Type.START_TRANSACTION); + } + public void unblockStartTransaction() { + blocking.unblock(Blocking.Type.START_TRANSACTION); } - public void setBlockAppend(boolean blockAppendVal) throws InterruptedException { - this.blockAppend = blockAppendVal; - if (blockAppendVal) { - blockingSemaphore.acquire(); - } else { - blockingSemaphore.release(); - } + public void blockWriteStateMachineData() { + blocking.block(Blocking.Type.WRITE_STATE_MACHINE_DATA); + } + public void unblockWriteStateMachineData() { + blocking.unblock(Blocking.Type.WRITE_STATE_MACHINE_DATA); } @Override public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { + LOG.info("{}: notifySlowness {}, {}", this, group, roleInfoProto); slownessInfo = roleInfoProto; } @Override public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { + LOG.info("{}: notifyExtendedNoLeader {}, {}", this, group, roleInfoProto); leaderElectionTimeoutInfo = roleInfoProto; } }
