Repository: incubator-ratis Updated Branches: refs/heads/master c80423652 -> 5de0fa647
RATIS-156. Implement configuration for client async requests. Contributed by Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/5de0fa64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/5de0fa64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/5de0fa64 Branch: refs/heads/master Commit: 5de0fa6473bc186926840154e7cf534b3f9839b1 Parents: c804236 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Sat Nov 25 20:37:11 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Sat Nov 25 20:37:11 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 2 +- .../ratis/client/RaftClientConfigKeys.java | 28 +++++++++++ .../org/apache/ratis/client/RaftClientRpc.java | 12 +---- .../ratis/client/impl/ClientImplUtils.java | 9 ++-- .../ratis/client/impl/RaftClientImpl.java | 18 ++++--- .../ratis/client/impl/RaftClientTestUtil.java | 4 ++ .../java/org/apache/ratis/RaftAsyncTests.java | 52 ++++++++++++-------- .../TestRaftAsyncWithSimulatedRpc.java | 24 --------- .../simulation/TestRaftWithSimulatedRpc.java | 5 -- 9 files changed, 84 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 4b152cb..d2c5c1a 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -106,7 +106,7 @@ public interface RaftClient extends Closeable { Objects.requireNonNull(group, "The 'group' field is not initialized."), leaderId, Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."), - retryInterval); + retryInterval, properties); } /** Set {@link RaftClient} ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index 9e7bd76..03f12cb 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -39,6 +39,34 @@ public interface RaftClientConfigKeys { } } + interface Async { + String PREFIX = RaftClientConfigKeys.PREFIX + ".async"; + + String MAX_OUTSTANDING_REQUESTS_KEY = PREFIX + ".outstanding-requests.max"; + int MAX_OUTSTANDING_REQUESTS_DEFAULT = 100; + + static int maxOutstandingRequests(RaftProperties properties) { + return getInt(properties::getInt, MAX_OUTSTANDING_REQUESTS_KEY, + MAX_OUTSTANDING_REQUESTS_DEFAULT, requireMin(2)); + } + + static void setMaxOutstandingRequests(RaftProperties properties, int outstandingRequests) { + setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, outstandingRequests); + } + + String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads"; + int SCHEDULER_THREADS_DEFAULT = 3; + + static int schedulerThreads(RaftProperties properties) { + return getInt(properties::getInt, SCHEDULER_THREADS_KEY, + SCHEDULER_THREADS_DEFAULT); + } + + static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) { + setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads); + } + } + static void main(String[] args) { printAll(RaftClientConfigKeys.class); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java index 310f9df..51f4430 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -25,20 +25,12 @@ import org.apache.ratis.protocol.RaftPeerId; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; /** The client side rpc of a raft service. */ public interface RaftClientRpc extends Closeable { /** Async call to send a request. */ - default CompletableFuture<RaftClientReply> sendRequestAsync( - RaftClientRequest request) { - return CompletableFuture.supplyAsync(() -> { - try { - return sendRequest(request); - } catch (Exception e) { - throw new CompletionException(e); - } - }); + default CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) { + throw new UnsupportedOperationException(getClass() + " does not support this method."); } /** Send a request. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 2ae2f35..e7c89f3 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -19,6 +19,7 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.ClientId; @@ -26,9 +27,9 @@ import org.apache.ratis.protocol.RaftPeerId; /** Client utilities for internal use. */ public class ClientImplUtils { - public static RaftClient newRaftClient( - ClientId clientId, RaftGroup group, RaftPeerId leaderId, - RaftClientRpc clientRpc, TimeDuration retryInterval) { - return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval); + public static RaftClient newRaftClient(ClientId clientId, RaftGroup group, + RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval, + RaftProperties properties) { + return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval, properties); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index fd1631d..906d55f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -18,7 +18,9 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Preconditions; @@ -49,12 +51,11 @@ final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); - private final Semaphore asyncRequestSemaphore = new Semaphore(100); + private final ScheduledExecutorService scheduler; + private final Semaphore asyncRequestSemaphore; - RaftClientImpl(ClientId clientId, RaftGroup group, - RaftPeerId leaderId, RaftClientRpc clientRpc, - TimeDuration retryInterval) { + RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, + RaftClientRpc clientRpc, TimeDuration retryInterval, RaftProperties properties) { this.clientId = clientId; this.clientRpc = clientRpc; this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); @@ -62,7 +63,8 @@ final class RaftClientImpl implements RaftClient { this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; this.retryInterval = retryInterval; - + asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties)); + scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties)); clientRpc.addServers(peers); } @@ -282,6 +284,10 @@ final class RaftClientImpl implements RaftClient { Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == expectedQueueLength); } + void assertScheduler(int numThreads) { + Preconditions.assertTrue(((ScheduledThreadPoolExecutor) scheduler).getCorePoolSize() == numThreads); + } + @Override public RaftClientRpc getClientRpc() { return clientRpc; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java index 50cf914..cab2dd0 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java @@ -25,4 +25,8 @@ public interface RaftClientTestUtil { RaftClient client, int expectedAvailablePermits, int expectedQueueLength) { ((RaftClientImpl) client).assertAsyncRequestSemaphore(expectedAvailablePermits, expectedQueueLength); } + + static void assertScheduler(RaftClient client, int numThreads){ + ((RaftClientImpl) client).assertScheduler(numThreads); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 336ba36..78c95c7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -19,16 +19,16 @@ package org.apache.ratis; import org.apache.log4j.Level; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.LogUtils; import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.*; @@ -43,37 +43,48 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } - { - final RaftProperties p = getProperties(); - p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, - SimpleStateMachine4Testing.class, StateMachine.class); - } - - private static final Logger LOG = LoggerFactory.getLogger(RaftAsyncTests.class); + private RaftProperties properties; - public static final int NUM_SERVERS = 5; - - private CLUSTER cluster = null; + public static final int NUM_SERVERS = 3; @Before public void setup() throws IOException { - cluster = newCluster(NUM_SERVERS); - Assert.assertNull(cluster.getLeader()); - cluster.start(); + properties = new RaftProperties(); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); } - @After - public void tearDown() { - cluster.shutdown(); + @Test + public void testAsyncConfiguration(){ + LOG.info("Running testAsyncConfiguration"); + RaftClient.Builder clientBuilder = RaftClient.newBuilder() + .setRaftGroup(RaftGroup.emptyGroup()) + .setProperties(properties); + RaftClient client = clientBuilder.build(); + int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT; + int maxOutstandingRequests = RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT; + RaftClientTestUtil.assertScheduler(client, numThreads); + RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); + + numThreads = 200; + maxOutstandingRequests = 5; + RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, maxOutstandingRequests); + RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads); + client = clientBuilder.build(); + RaftClientTestUtil.assertScheduler(client, numThreads); + RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); } @Test public void testAsyncRequestSemaphore() throws InterruptedException, IOException { - LOG.info("Running testBasicAppendEntries"); + LOG.info("Running testAsyncRequestSemaphore"); + CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties); + Assert.assertNull(cluster.getLeader()); + cluster.start(); waitForLeader(cluster); - int numMessages = 100; + int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(properties); CompletableFuture[] futures = new CompletableFuture[numMessages + 1]; final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages); final RaftClient client = cluster.createClient(); @@ -115,5 +126,6 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba futures[i].join(); } Assert.assertTrue(blockedRequestsCount.get() == 0); + cluster.shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java deleted file mode 100644 index 7741680..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.simulation; - -import org.apache.ratis.RaftAsyncTests; - -public class TestRaftAsyncWithSimulatedRpc extends RaftAsyncTests<MiniRaftClusterWithSimulatedRpc> - implements MiniRaftClusterWithSimulatedRpc.FactoryGet { -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5de0fa64/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java index 5b7f13e..b5a35d2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java @@ -22,7 +22,6 @@ import org.apache.ratis.RaftBasicTests; import org.apache.ratis.client.RaftClient; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.LogUtils; -import org.junit.Test; import java.io.IOException; @@ -44,8 +43,4 @@ public class TestRaftWithSimulatedRpc extends RaftBasicTests { return cluster; } - @Test - public void testBasicAppendEntriesAsync() throws Exception { - super.testBasicAppendEntries(true); - } }
