Repository: incubator-ratis
Updated Branches:
  refs/heads/master 2fb731f91 -> a7b29658f


RATIS-143. RaftClientImpl should have upper bound on async requests.  
Contributed by Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a7b29658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a7b29658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a7b29658

Branch: refs/heads/master
Commit: a7b29658ff7c656cb566680fa3c9cc10da5f1d0a
Parents: 2fb731f
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Thu Nov 23 17:07:28 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Thu Nov 23 17:07:28 2017 -0800

----------------------------------------------------------------------
 .../ratis/client/impl/RaftClientImpl.java       |  23 +++-
 .../ratis/client/impl/RaftClientTestUtil.java   |  28 +++++
 .../ratis/grpc/TestRaftAsyncWithGrpc.java       |  24 ++++
 .../apache/ratis/server/impl/RetryCache.java    |   6 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 119 +++++++++++++++++++
 .../TestRaftAsyncWithSimulatedRpc.java          |  24 ++++
 .../SimpleStateMachine4Testing.java             |  22 ++++
 7 files changed, 240 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ea2a3bc..fd1631d 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,6 +21,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.*;
 
@@ -49,13 +50,14 @@ final class RaftClientImpl implements RaftClient {
   private volatile RaftPeerId leaderId;
 
   private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(3);
+  private final Semaphore asyncRequestSemaphore = new Semaphore(100);
 
   RaftClientImpl(ClientId clientId, RaftGroup group,
       RaftPeerId leaderId, RaftClientRpc clientRpc,
       TimeDuration retryInterval) {
     this.clientId = clientId;
     this.clientRpc = clientRpc;
-    this.peers = new ArrayList<>(group.getPeers());
+    this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
     this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
@@ -82,15 +84,21 @@ final class RaftClientImpl implements RaftClient {
   private CompletableFuture<RaftClientReply> sendAsync(Message message,
       boolean readOnly) {
     Objects.requireNonNull(message, "message == null");
+    try {
+      asyncRequestSemaphore.acquire();
+    } catch (InterruptedException e) {
+      throw new CompletionException(IOUtils.toInterruptedIOException(
+          "Interrupted when sending " + message, e));
+    }
     final long callId = nextCallId();
     return sendRequestWithRetryAsync(
         () -> new RaftClientRequest(clientId, leaderId, groupId, callId, 
message, readOnly)
-    ).thenApplyAsync(reply -> {
+    ).thenApply(reply -> {
       if (reply.hasStateMachineException() || 
reply.hasGroupMismatchException()) {
         throw new CompletionException(reply.getException());
       }
       return reply;
-    });
+    }).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
 
   @Override
@@ -188,7 +196,7 @@ final class RaftClientImpl implements RaftClient {
   private CompletableFuture<RaftClientReply> sendRequestAsync(
       RaftClientRequest request) {
     LOG.debug("{}: sendAsync {}", clientId, request);
-    return clientRpc.sendRequestAsync(request).thenApplyAsync(reply -> {
+    return clientRpc.sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive {}", clientId, reply);
       if (reply != null && reply.isNotLeader()) {
         handleNotLeaderException(request, reply.getNotLeaderException());
@@ -197,7 +205,7 @@ final class RaftClientImpl implements RaftClient {
       return reply;
     }).exceptionally(e -> {
       final Throwable cause = e.getCause();
-      if (cause instanceof RaftException) {
+      if (cause instanceof GroupMismatchException) {
         return new RaftClientReply(request, (RaftException) cause);
       } else if (cause instanceof IOException) {
         handleIOException(request, (IOException) cause, null);
@@ -269,6 +277,11 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
+  void assertAsyncRequestSemaphore(int expectedAvailablePermits, int 
expectedQueueLength) {
+    Preconditions.assertTrue(asyncRequestSemaphore.availablePermits() == 
expectedAvailablePermits);
+    Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == 
expectedQueueLength);
+  }
+
   @Override
   public RaftClientRpc getClientRpc() {
     return clientRpc;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
new file mode 100644
index 0000000..50cf914
--- /dev/null
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClient;
+
+/** Interface for testing raft client. */
+public interface RaftClientTestUtil {
+  static void assertAsyncRequestSemaphore(
+      RaftClient client, int expectedAvailablePermits, int 
expectedQueueLength) {
+    ((RaftClientImpl) 
client).assertAsyncRequestSemaphore(expectedAvailablePermits, 
expectedQueueLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
new file mode 100644
index 0000000..752a3dd
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftAsyncTests;
+
+public class TestRaftAsyncWithGrpc extends 
RaftAsyncTests<MiniRaftClusterWithGRpc>
+    implements MiniRaftClusterWithGRpc.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
index 4e65124..afe2a7e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
@@ -99,6 +99,10 @@ public class RetryCache implements Closeable {
       return isFailed() || replyFuture.isDone();
     }
 
+    boolean isCompletedNormally() {
+      return !failed && replyFuture.isDone() && 
!replyFuture.isCompletedExceptionally() && !replyFuture.isCancelled();
+    }
+
     void updateResult(RaftClientReply reply) {
       assert !replyFuture.isDone() && !replyFuture.isCancelled();
       replyFuture.complete(reply);
@@ -162,7 +166,7 @@ public class RetryCache implements Closeable {
     } catch (ExecutionException e) {
       throw new IllegalStateException(e);
     }
-    Preconditions.assertTrue(entry != null && !entry.isDone(),
+    Preconditions.assertTrue(entry != null && !entry.isCompletedNormally(),
         "retry cache entry should be pending: %s", entry);
     return entry;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
new file mode 100644
index 0000000..336ba36
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.RaftClientTestUtil;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends 
BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RaftAsyncTests.class);
+
+  public static final int NUM_SERVERS = 5;
+
+  private CLUSTER cluster = null;
+
+  @Before
+  public void setup() throws IOException {
+    cluster = newCluster(NUM_SERVERS);
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testAsyncRequestSemaphore()
+      throws InterruptedException, IOException {
+    LOG.info("Running testBasicAppendEntries");
+    waitForLeader(cluster);
+
+    int numMessages = 100;
+    CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
+    final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numMessages);
+    final RaftClient client = cluster.createClient();
+    //Set blockTransaction flag so that transaction blocks
+    for (RaftServerProxy server : cluster.getServers()) {
+      ((SimpleStateMachine4Testing) 
server.getStateMachine()).setBlockTransaction(true);
+    }
+
+    //Send numMessages which are blocked and do not release the client 
semaphore permits
+    AtomicInteger blockedRequestsCount = new AtomicInteger();
+    for (int i=0; i<numMessages; i++) {
+      blockedRequestsCount.getAndIncrement();
+      futures[i] = client.sendAsync(messages[i]);
+      blockedRequestsCount.decrementAndGet();
+    }
+    Assert.assertTrue(blockedRequestsCount.get() == 0);
+
+    ExecutorService threadPool = Executors.newFixedThreadPool(1);
+    futures[numMessages] = CompletableFuture.supplyAsync(() -> {
+      blockedRequestsCount.incrementAndGet();
+      client.sendAsync(new RaftTestUtil.SimpleMessage("n1"));
+      blockedRequestsCount.decrementAndGet();
+      return null;
+    }, threadPool);
+
+    //Allow the last msg to be sent
+    while (blockedRequestsCount.get() != 1) {
+      Thread.sleep(1000);
+    }
+    Assert.assertTrue(blockedRequestsCount.get() == 1);
+    //Since all semaphore permits are acquired the last message sent is in 
queue
+    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
+
+    //Unset the blockTransaction flag so that semaphore permits can be released
+    for (RaftServerProxy server : cluster.getServers()) {
+      ((SimpleStateMachine4Testing) 
server.getStateMachine()).setBlockTransaction(false);
+    }
+    for(int i=0; i<=numMessages; i++){
+      futures[i].join();
+    }
+    Assert.assertTrue(blockedRequestsCount.get() == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
new file mode 100644
index 0000000..7741680
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.RaftAsyncTests;
+
+public class TestRaftAsyncWithSimulatedRpc extends 
RaftAsyncTests<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index a6e6672..d041fcf 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -45,6 +45,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
 
 /**
  * A {@link StateMachine} implementation example that simply stores all the log
@@ -77,6 +78,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
 
   private volatile boolean running = true;
+  private boolean blockTransaction = false;
+  private final Semaphore blockingSemaphore = new Semaphore(1);
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
 
   SimpleStateMachine4Testing() {
@@ -225,6 +228,16 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   @Override
   public TransactionContext startTransaction(RaftClientRequest request)
       throws IOException {
+    if (blockTransaction) {
+      try {
+        //blocks until blockTransaction is set to false
+        blockingSemaphore.acquire();
+        blockingSemaphore.release();
+      } catch (InterruptedException e) {
+        LOG.error("Could not block applyTransaction", e);
+        Thread.currentThread().interrupt();
+      }
+    }
     return new TransactionContext(this, request, SMLogEntryProto.newBuilder()
         .setData(request.getMessage().getContent())
         .build());
@@ -246,4 +259,13 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   public LogEntryProto[] getContent() {
     return list.toArray(new LogEntryProto[list.size()]);
   }
+
+  public void setBlockTransaction(boolean blockTransactionVal) throws 
InterruptedException {
+    this.blockTransaction = blockTransactionVal;
+    if (blockTransactionVal) {
+      blockingSemaphore.acquire();
+    } else {
+      blockingSemaphore.release();
+    }
+  }
 }

Reply via email to