Repository: incubator-ratis
Updated Branches:
  refs/heads/master a0843f4c2 -> e35d954e3


RATIS-327. Fix bugs in SimpleStateMachine4Testing. Contributed by Tsz Wo 
Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e35d954e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e35d954e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e35d954e

Branch: refs/heads/master
Commit: e35d954e34adb4417dcc96c563df79b559768897
Parents: a0843f4
Author: Lokesh Jain <[email protected]>
Authored: Tue Oct 2 15:58:05 2018 +0530
Committer: Lokesh Jain <[email protected]>
Committed: Tue Oct 2 15:58:05 2018 +0530

----------------------------------------------------------------------
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |  26 ++---
 .../apache/ratis/server/impl/LeaderState.java   |   1 -
 .../ratis/server/impl/PendingRequests.java      |   7 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |   2 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   |  46 ++++----
 .../TestRaftServerLeaderElectionTimeout.java    |   2 +-
 .../ratis/TestRaftServerSlownessDetection.java  |   7 +-
 .../SimpleStateMachine4Testing.java             | 108 +++++++++----------
 8 files changed, 90 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 2d0af07..7ae385d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -68,28 +68,22 @@ public class TestRaftWithGrpc
     long waitTime = 5000;
     try (final RaftClient client = cluster.createClient()) {
       // block append requests
-      cluster.getServerAliveStream().forEach(raftServer -> {
-        try {
-          if (!raftServer.isLeader()) {
-            ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(true);
-          }
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted while blocking append", e);
-        }
-      });
+      cluster.getServerAliveStream()
+          .filter(impl -> !impl.isLeader())
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
+
       CompletableFuture<RaftClientReply>
           replyFuture = client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
       Thread.sleep(waitTime);
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertTrue(!replyFuture.isDone());
       // unblock append request.
-      cluster.getServerAliveStream().forEach(raftServer -> {
-        try {
-          ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(false);
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted while unblocking append", e);
-        }
-      });
+      cluster.getServerAliveStream()
+          .filter(impl -> !impl.isLeader())
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
+
       long index = cluster.getLeader().getState().getLog().getNextIndex();
       TermIndex[] leaderEntries = 
cluster.getLeader().getState().getLog().getEntries(0, Integer.MAX_VALUE);
       // The entries have been appended in the followers

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e355d4f..be54346 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -494,7 +494,6 @@ public class LeaderState {
       if (conf.isTransitional()) {
         replicateNewConf();
       } else { // the (new) log entry has been committed
-        LOG.debug("{} sends success to setConfiguration request", 
server.getId());
         pendingRequests.replySetConfiguration();
         // if the leader is not included in the current configuration, step 
down
         if (!conf.containsInConf(server.getId())) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c84f944..9a4ed74 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -174,9 +174,11 @@ class PendingRequests {
     // commits the new configuration while it has not received the retry
     // request from the client
     if (pendingSetConf != null) {
+      final RaftClientRequest request = pendingSetConf.getRequest();
+      LOG.debug("{}: sends success for {}", server.getId(), request);
       // for setConfiguration we do not need to wait for statemachine. send 
back
       // reply after it's committed.
-      pendingSetConf.setReply(new RaftClientReply(pendingSetConf.getRequest(), 
server.getCommitInfos()));
+      pendingSetConf.setReply(new RaftClientReply(request, 
server.getCommitInfos()));
       pendingSetConf = null;
     }
   }
@@ -216,8 +218,7 @@ class PendingRequests {
    * requests since they have not got applied to the state machine yet.
    */
   void sendNotLeaderResponses() throws IOException {
-    LOG.info("{} sends responses before shutting down PendingRequestsHandler",
-        server.getId());
+    LOG.info("{}: sendNotLeaderResponses", server.getId());
 
     // notify the state machine about stepping down
     final NotLeaderException nle = server.generateNotLeaderException();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 60f7050..7db7685 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -485,7 +485,7 @@ public abstract class MiniRaftCluster implements Closeable {
     return getRaftServerImpl(servers.get(id));
   }
 
-  private RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
+  public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
     return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 776e1f1..26213e2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -25,8 +25,6 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -91,13 +89,6 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     }
   }
 
-  static void setBlockTransaction(boolean block, MiniRaftCluster cluster) 
throws InterruptedException {
-    for (RaftServerProxy server : cluster.getServers()) {
-      final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, 
cluster.getGroupId());
-      
((SimpleStateMachine4Testing)impl.getStateMachine()).setBlockTransaction(block);
-    }
-  }
-
   @Test
   public void testAsyncRequestSemaphore() throws Exception {
     LOG.info("Running testAsyncRequestSemaphore");
@@ -111,7 +102,10 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numMessages);
     final RaftClient client = cluster.createClient();
     //Set blockTransaction flag so that transaction blocks
-    setBlockTransaction(true, cluster);
+    cluster.getServers().stream()
+        .map(cluster::getRaftServerImpl)
+        .map(SimpleStateMachine4Testing::get)
+        .forEach(SimpleStateMachine4Testing::blockStartTransaction);
 
     //Send numMessages which are blocked and do not release the client 
semaphore permits
     AtomicInteger blockedRequestsCount = new AtomicInteger();
@@ -139,7 +133,11 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
 
     //Unset the blockTransaction flag so that semaphore permits can be released
-    setBlockTransaction(false, cluster);
+    cluster.getServers().stream()
+        .map(cluster::getRaftServerImpl)
+        .map(SimpleStateMachine4Testing::get)
+        .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+
     for(int i=0; i<=numMessages; i++){
       futures[i].join();
     }
@@ -285,27 +283,21 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     long waitTime = 5000;
     try (final RaftClient client = cluster.createClient()) {
       // block append requests
-      cluster.getServerAliveStream().forEach(raftServer -> {
-        try {
-          if (!raftServer.isLeader()) {
-            ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(true);
-          }
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted while blocking append", e);
-        }
-      });
+      cluster.getServerAliveStream()
+          .filter(impl -> !impl.isLeader())
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
+
       CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
       Thread.sleep(waitTime);
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertTrue(!replyFuture.isDone());
       // unblock append request.
-      cluster.getServerAliveStream().forEach(raftServer -> {
-        try {
-          ((SimpleStateMachine4Testing) 
raftServer.getStateMachine()).setBlockAppend(false);
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted while unblocking append", e);
-        }
-      });
+      cluster.getServerAliveStream()
+          .filter(impl -> !impl.isLeader())
+          .map(SimpleStateMachine4Testing::get)
+          .forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
+
       replyFuture.get();
       Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
index 6ec321f..55bcdfc 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
@@ -86,7 +86,7 @@ public class TestRaftServerLeaderElectionTimeout extends 
BaseTest {
     cluster.killServer(failedFollower.getId());
     cluster.killServer(cluster.getLeader().getId());
 
-    // Wait to ensure that leader election is trigerred and also state machine 
callback is triggered
+    // Wait to ensure that leader election is triggered and also state machine 
callback is triggered
     Thread.sleep( leaderElectionTimeout * 2);
 
     RaftProtos.RoleInfoProto roleInfoProto =

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index b8ef87e..e109598 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -18,7 +18,6 @@
 package org.apache.ratis;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -45,7 +44,6 @@ import java.util.concurrent.TimeUnit;
 public class TestRaftServerSlownessDetection extends BaseTest {
   static {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   public static final int NUM_SERVERS = 3;
@@ -100,8 +98,9 @@ public class TestRaftServerSlownessDetection extends 
BaseTest {
         roleInfoProto.getLeaderInfo().getFollowerInfoList();
     //Assert that the node shutdown is lagging behind
     for (RaftProtos.ServerRpcProto serverProto : followers) {
-      Assert.assertTrue(!(RaftPeerId.valueOf(serverProto.getId().getId()) == 
failedFollower.getId()) ||
-          serverProto.getLastRpcElapsedTimeMs() > slownessTimeout);
+      if 
(RaftPeerId.valueOf(serverProto.getId().getId()).equals(failedFollower.getId()))
 {
+        Assert.assertTrue(serverProto.getLastRpcElapsedTimeMs() > 
slownessTimeout);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e35d954e/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 6285701..dcb899a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -49,7 +49,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.ExecutionException;
 
 /**
  * A {@link StateMachine} implementation example that simply stores all the log
@@ -81,12 +81,40 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
 
   private volatile boolean running = true;
-  private volatile boolean blockTransaction = false;
-  private volatile boolean blockAppend = false;
-  private final Semaphore blockingSemaphore = new Semaphore(1);
+
+
+  static class Blocking {
+    enum Type {
+      START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA
+    }
+
+    private final EnumMap<Type, CompletableFuture<Void>> maps = new 
EnumMap<>(Type.class);
+
+    void block(Type type) {
+      final CompletableFuture<Void> future = new CompletableFuture<>();
+      final CompletableFuture<Void> previous = maps.putIfAbsent(type, future);
+      Preconditions.assertNull(previous, "previous");
+    }
+
+    void unblock(Type type) {
+      final CompletableFuture<Void> future = maps.remove(type);
+      Objects.requireNonNull(future, "future == null");
+      future.complete(null);
+    }
+
+    void await(Type type) {
+      try {
+        maps.getOrDefault(type, CompletableFuture.completedFuture(null)).get();
+      } catch(InterruptedException | ExecutionException e) {
+        throw new IllegalStateException("Failed to await " + type, e);
+      }
+    }
+  }
+
+  private final Blocking blocking = new Blocking();
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
-  private RoleInfoProto slownessInfo = null;
-  private RoleInfoProto leaderElectionTimeoutInfo = null;
+  private volatile RoleInfoProto slownessInfo = null;
+  private volatile RoleInfoProto leaderElectionTimeoutInfo = null;
 
   public SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
@@ -251,18 +279,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   @Override
-  public TransactionContext startTransaction(RaftClientRequest request)
-      throws IOException {
-    if (blockTransaction) {
-      try {
-        //blocks until blockTransaction is set to false
-        blockingSemaphore.acquire();
-        blockingSemaphore.release();
-      } catch (InterruptedException e) {
-        LOG.error("Could not block applyTransaction", e);
-        Thread.currentThread().interrupt();
-      }
-    }
+  public TransactionContext startTransaction(RaftClientRequest request) throws 
IOException {
+    blocking.await(Blocking.Type.START_TRANSACTION);
     return new TransactionContextImpl(this, request, 
SMLogEntryProto.newBuilder()
         .setData(request.getMessage().getContent())
         .setStateMachineData(ByteString.copyFromUtf8("StateMachine 
Data")).build());
@@ -270,34 +288,14 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
 
   @Override
   public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
-    CompletableFuture<?> f = new CompletableFuture();
-    if (blockAppend) {
-      try {
-        blockingSemaphore.acquire();
-        blockingSemaphore.release();
-      } catch (InterruptedException e) {
-        LOG.error("Could not block writeStateMachineData", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-    f.complete(null);
-    return f;
+    blocking.await(Blocking.Type.WRITE_STATE_MACHINE_DATA);
+    return CompletableFuture.completedFuture(null);
   }
 
   @Override
   public CompletableFuture<ByteString> readStateMachineData(LogEntryProto 
entry) {
-    CompletableFuture<ByteString> f = new CompletableFuture<>();
-    if (blockAppend) {
-      try {
-        blockingSemaphore.acquire();
-        blockingSemaphore.release();
-      } catch (InterruptedException e) {
-        LOG.error("Could not block readStateMachineData", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-    f.complete(null);
-    return f;
+    blocking.await(Blocking.Type.READ_STATE_MACHINE_DATA);
+    return CompletableFuture.completedFuture(null);
   }
 
   @Override
@@ -312,31 +310,29 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return list.toArray(new LogEntryProto[list.size()]);
   }
 
-  public void setBlockTransaction(boolean blockTransactionVal) throws 
InterruptedException {
-    this.blockTransaction = blockTransactionVal;
-    if (blockTransactionVal) {
-      blockingSemaphore.acquire();
-    } else {
-      blockingSemaphore.release();
-    }
+  public void blockStartTransaction() {
+    blocking.block(Blocking.Type.START_TRANSACTION);
+  }
+  public void unblockStartTransaction() {
+    blocking.unblock(Blocking.Type.START_TRANSACTION);
   }
 
-  public void setBlockAppend(boolean blockAppendVal) throws 
InterruptedException {
-    this.blockAppend = blockAppendVal;
-    if (blockAppendVal) {
-      blockingSemaphore.acquire();
-    } else {
-      blockingSemaphore.release();
-    }
+  public void blockWriteStateMachineData() {
+    blocking.block(Blocking.Type.WRITE_STATE_MACHINE_DATA);
+  }
+  public void unblockWriteStateMachineData() {
+    blocking.unblock(Blocking.Type.WRITE_STATE_MACHINE_DATA);
   }
 
   @Override
   public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+    LOG.info("{}: notifySlowness {}, {}", this, group, roleInfoProto);
     slownessInfo = roleInfoProto;
   }
 
   @Override
   public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto 
roleInfoProto) {
+    LOG.info("{}: notifyExtendedNoLeader {}, {}", this, group, roleInfoProto);
     leaderElectionTimeoutInfo = roleInfoProto;
   }
 }

Reply via email to