Repository: incubator-ratis
Updated Branches:
  refs/heads/master c80423652 -> 5de0fa647


RATIS-156. Implement configuration for client async requests.  Contributed by 
Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/5de0fa64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/5de0fa64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/5de0fa64

Branch: refs/heads/master
Commit: 5de0fa6473bc186926840154e7cf534b3f9839b1
Parents: c804236
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Sat Nov 25 20:37:11 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Sat Nov 25 20:37:11 2017 -0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  2 +-
 .../ratis/client/RaftClientConfigKeys.java      | 28 +++++++++++
 .../org/apache/ratis/client/RaftClientRpc.java  | 12 +----
 .../ratis/client/impl/ClientImplUtils.java      |  9 ++--
 .../ratis/client/impl/RaftClientImpl.java       | 18 ++++---
 .../ratis/client/impl/RaftClientTestUtil.java   |  4 ++
 .../java/org/apache/ratis/RaftAsyncTests.java   | 52 ++++++++++++--------
 .../TestRaftAsyncWithSimulatedRpc.java          | 24 ---------
 .../simulation/TestRaftWithSimulatedRpc.java    |  5 --
 9 files changed, 84 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 4b152cb..d2c5c1a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -106,7 +106,7 @@ public interface RaftClient extends Closeable {
           Objects.requireNonNull(group, "The 'group' field is not 
initialized."),
           leaderId,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not 
initialized."),
-          retryInterval);
+          retryInterval, properties);
     }
 
     /** Set {@link RaftClient} ID. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 9e7bd76..03f12cb 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -39,6 +39,34 @@ public interface RaftClientConfigKeys {
     }
   }
 
+  interface Async {
+    String PREFIX = RaftClientConfigKeys.PREFIX + ".async";
+
+    String MAX_OUTSTANDING_REQUESTS_KEY = PREFIX + ".outstanding-requests.max";
+    int MAX_OUTSTANDING_REQUESTS_DEFAULT = 100;
+
+    static int maxOutstandingRequests(RaftProperties properties) {
+      return getInt(properties::getInt, MAX_OUTSTANDING_REQUESTS_KEY,
+          MAX_OUTSTANDING_REQUESTS_DEFAULT, requireMin(2));
+    }
+
+    static void setMaxOutstandingRequests(RaftProperties properties, int 
outstandingRequests) {
+      setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, 
outstandingRequests);
+    }
+
+    String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads";
+    int SCHEDULER_THREADS_DEFAULT = 3;
+
+    static int schedulerThreads(RaftProperties properties) {
+      return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
+          SCHEDULER_THREADS_DEFAULT);
+    }
+
+    static void setSchedulerThreads(RaftProperties properties, int 
schedulerThreads) {
+      setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads);
+    }
+  }
+
   static void main(String[] args) {
     printAll(RaftClientConfigKeys.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 310f9df..51f4430 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -25,20 +25,12 @@ import org.apache.ratis.protocol.RaftPeerId;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 
 /** The client side rpc of a raft service. */
 public interface RaftClientRpc extends Closeable {
   /** Async call to send a request. */
-  default CompletableFuture<RaftClientReply> sendRequestAsync(
-      RaftClientRequest request) {
-    return CompletableFuture.supplyAsync(() -> {
-      try {
-        return sendRequest(request);
-      } catch (Exception e) {
-        throw new CompletionException(e);
-      }
-    });
+  default CompletableFuture<RaftClientReply> 
sendRequestAsync(RaftClientRequest request) {
+    throw new UnsupportedOperationException(getClass() + " does not support 
this method.");
   }
 
   /** Send a request. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 2ae2f35..e7c89f3 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.ClientId;
@@ -26,9 +27,9 @@ import org.apache.ratis.protocol.RaftPeerId;
 
 /** Client utilities for internal use. */
 public class ClientImplUtils {
-  public static RaftClient newRaftClient(
-      ClientId clientId, RaftGroup group, RaftPeerId leaderId,
-      RaftClientRpc clientRpc, TimeDuration retryInterval) {
-    return new RaftClientImpl(clientId, group, leaderId, clientRpc, 
retryInterval);
+  public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
+      RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval,
+      RaftProperties properties) {
+    return new RaftClientImpl(clientId, group, leaderId, clientRpc, 
retryInterval, properties);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index fd1631d..906d55f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -18,7 +18,9 @@
 package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.Preconditions;
@@ -49,12 +51,11 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(3);
-  private final Semaphore asyncRequestSemaphore = new Semaphore(100);
+  private final ScheduledExecutorService scheduler;
+  private final Semaphore asyncRequestSemaphore;
 
-  RaftClientImpl(ClientId clientId, RaftGroup group,
-      RaftPeerId leaderId, RaftClientRpc clientRpc,
-      TimeDuration retryInterval) {
+  RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
+      RaftClientRpc clientRpc, TimeDuration retryInterval, RaftProperties 
properties) {
     this.clientId = clientId;
     this.clientRpc = clientRpc;
     this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
@@ -62,7 +63,8 @@ final class RaftClientImpl implements RaftClient {
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
     this.retryInterval = retryInterval;
-
+    asyncRequestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
+    scheduler = 
Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
     clientRpc.addServers(peers);
   }
 
@@ -282,6 +284,10 @@ final class RaftClientImpl implements RaftClient {
     Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == 
expectedQueueLength);
   }
 
+  void assertScheduler(int numThreads) {
+    Preconditions.assertTrue(((ScheduledThreadPoolExecutor) 
scheduler).getCorePoolSize() == numThreads);
+  }
+
   @Override
   public RaftClientRpc getClientRpc() {
     return clientRpc;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 50cf914..cab2dd0 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -25,4 +25,8 @@ public interface RaftClientTestUtil {
       RaftClient client, int expectedAvailablePermits, int 
expectedQueueLength) {
     ((RaftClientImpl) 
client).assertAsyncRequestSemaphore(expectedAvailablePermits, 
expectedQueueLength);
   }
+
+  static void assertScheduler(RaftClient client, int numThreads){
+    ((RaftClientImpl) client).assertScheduler(numThreads);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 336ba36..78c95c7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -19,16 +19,16 @@ package org.apache.ratis;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.LogUtils;
 import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.*;
@@ -43,37 +43,48 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
-  {
-    final RaftProperties p = getProperties();
-    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SimpleStateMachine4Testing.class, StateMachine.class);
-  }
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(RaftAsyncTests.class);
+  private RaftProperties properties;
 
-  public static final int NUM_SERVERS = 5;
-
-  private CLUSTER cluster = null;
+  public static final int NUM_SERVERS = 3;
 
   @Before
   public void setup() throws IOException {
-    cluster = newCluster(NUM_SERVERS);
-    Assert.assertNull(cluster.getLeader());
-    cluster.start();
+    properties = new RaftProperties();
+    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
   }
 
-  @After
-  public void tearDown() {
-    cluster.shutdown();
+  @Test
+  public void testAsyncConfiguration(){
+    LOG.info("Running testAsyncConfiguration");
+    RaftClient.Builder clientBuilder = RaftClient.newBuilder()
+        .setRaftGroup(RaftGroup.emptyGroup())
+        .setProperties(properties);
+    RaftClient client = clientBuilder.build();
+    int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT;
+    int maxOutstandingRequests = 
RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT;
+    RaftClientTestUtil.assertScheduler(client, numThreads);
+    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
+
+    numThreads = 200;
+    maxOutstandingRequests = 5;
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 
maxOutstandingRequests);
+    RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads);
+    client = clientBuilder.build();
+    RaftClientTestUtil.assertScheduler(client, numThreads);
+    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
   }
 
   @Test
   public void testAsyncRequestSemaphore()
       throws InterruptedException, IOException {
-    LOG.info("Running testBasicAppendEntries");
+    LOG.info("Running testAsyncRequestSemaphore");
+    CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
     waitForLeader(cluster);
 
-    int numMessages = 100;
+    int numMessages = 
RaftClientConfigKeys.Async.maxOutstandingRequests(properties);
     CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
     final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numMessages);
     final RaftClient client = cluster.createClient();
@@ -115,5 +126,6 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       futures[i].join();
     }
     Assert.assertTrue(blockedRequestsCount.get() == 0);
+    cluster.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
deleted file mode 100644
index 7741680..0000000
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.simulation;
-
-import org.apache.ratis.RaftAsyncTests;
-
-public class TestRaftAsyncWithSimulatedRpc extends 
RaftAsyncTests<MiniRaftClusterWithSimulatedRpc>
-    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
index 5b7f13e..b5a35d2 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
@@ -22,7 +22,6 @@ import org.apache.ratis.RaftBasicTests;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.util.LogUtils;
-import org.junit.Test;
 
 import java.io.IOException;
 
@@ -44,8 +43,4 @@ public class TestRaftWithSimulatedRpc extends RaftBasicTests {
     return cluster;
   }
 
-  @Test
-  public void testBasicAppendEntriesAsync() throws Exception {
-    super.testBasicAppendEntries(true);
-  }
 }

Reply via email to