Repository: incubator-ratis Updated Branches: refs/heads/master 2fb731f91 -> a7b29658f
RATIS-143. RaftClientImpl should have upper bound on async requests. Contributed by Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a7b29658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a7b29658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a7b29658 Branch: refs/heads/master Commit: a7b29658ff7c656cb566680fa3c9cc10da5f1d0a Parents: 2fb731f Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Thu Nov 23 17:07:28 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Thu Nov 23 17:07:28 2017 -0800 ---------------------------------------------------------------------- .../ratis/client/impl/RaftClientImpl.java | 23 +++- .../ratis/client/impl/RaftClientTestUtil.java | 28 +++++ .../ratis/grpc/TestRaftAsyncWithGrpc.java | 24 ++++ .../apache/ratis/server/impl/RetryCache.java | 6 +- .../java/org/apache/ratis/RaftAsyncTests.java | 119 +++++++++++++++++++ .../TestRaftAsyncWithSimulatedRpc.java | 24 ++++ .../SimpleStateMachine4Testing.java | 22 ++++ 7 files changed, 240 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index ea2a3bc..fd1631d 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -21,6 +21,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.CollectionUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.*; @@ -49,13 +50,14 @@ final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + private final Semaphore asyncRequestSemaphore = new Semaphore(100); RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval) { this.clientId = clientId; this.clientRpc = clientRpc; - this.peers = new ArrayList<>(group.getPeers()); + this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); this.groupId = group.getGroupId(); this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; @@ -82,15 +84,21 @@ final class RaftClientImpl implements RaftClient { private CompletableFuture<RaftClientReply> sendAsync(Message message, boolean readOnly) { Objects.requireNonNull(message, "message == null"); + try { + asyncRequestSemaphore.acquire(); + } catch (InterruptedException e) { + throw new CompletionException(IOUtils.toInterruptedIOException( + "Interrupted when sending " + message, e)); + } final long callId = nextCallId(); return sendRequestWithRetryAsync( () -> new RaftClientRequest(clientId, leaderId, groupId, callId, message, readOnly) - ).thenApplyAsync(reply -> { + ).thenApply(reply -> { if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) { throw new CompletionException(reply.getException()); } return reply; - }); + }).whenComplete((r, e) -> asyncRequestSemaphore.release()); } @Override @@ -188,7 +196,7 @@ final class RaftClientImpl implements RaftClient { private CompletableFuture<RaftClientReply> sendRequestAsync( RaftClientRequest request) { LOG.debug("{}: sendAsync {}", clientId, request); - return clientRpc.sendRequestAsync(request).thenApplyAsync(reply -> { + return clientRpc.sendRequestAsync(request).thenApply(reply -> { LOG.debug("{}: receive {}", clientId, reply); if (reply != null && reply.isNotLeader()) { handleNotLeaderException(request, reply.getNotLeaderException()); @@ -197,7 +205,7 @@ final class RaftClientImpl implements RaftClient { return reply; }).exceptionally(e -> { final Throwable cause = e.getCause(); - if (cause instanceof RaftException) { + if (cause instanceof GroupMismatchException) { return new RaftClientReply(request, (RaftException) cause); } else if (cause instanceof IOException) { handleIOException(request, (IOException) cause, null); @@ -269,6 +277,11 @@ final class RaftClientImpl implements RaftClient { } } + void assertAsyncRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) { + Preconditions.assertTrue(asyncRequestSemaphore.availablePermits() == expectedAvailablePermits); + Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == expectedQueueLength); + } + @Override public RaftClientRpc getClientRpc() { return clientRpc; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java new file mode 100644 index 0000000..50cf914 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client.impl; + +import org.apache.ratis.client.RaftClient; + +/** Interface for testing raft client. */ +public interface RaftClientTestUtil { + static void assertAsyncRequestSemaphore( + RaftClient client, int expectedAvailablePermits, int expectedQueueLength) { + ((RaftClientImpl) client).assertAsyncRequestSemaphore(expectedAvailablePermits, expectedQueueLength); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java new file mode 100644 index 0000000..752a3dd --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.RaftAsyncTests; + +public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGRpc> + implements MiniRaftClusterWithGRpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java index 4e65124..afe2a7e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java @@ -99,6 +99,10 @@ public class RetryCache implements Closeable { return isFailed() || replyFuture.isDone(); } + boolean isCompletedNormally() { + return !failed && replyFuture.isDone() && !replyFuture.isCompletedExceptionally() && !replyFuture.isCancelled(); + } + void updateResult(RaftClientReply reply) { assert !replyFuture.isDone() && !replyFuture.isCancelled(); replyFuture.complete(reply); @@ -162,7 +166,7 @@ public class RetryCache implements Closeable { } catch (ExecutionException e) { throw new IllegalStateException(e); } - Preconditions.assertTrue(entry != null && !entry.isDone(), + Preconditions.assertTrue(entry != null && !entry.isCompletedNormally(), "retry cache entry should be pending: %s", entry); return entry; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java new file mode 100644 index 0000000..336ba36 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.impl.RaftClientTestUtil; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.ratis.RaftTestUtil.waitForLeader; + +public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + { + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + } + + private static final Logger LOG = LoggerFactory.getLogger(RaftAsyncTests.class); + + public static final int NUM_SERVERS = 5; + + private CLUSTER cluster = null; + + @Before + public void setup() throws IOException { + cluster = newCluster(NUM_SERVERS); + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } + + @After + public void tearDown() { + cluster.shutdown(); + } + + @Test + public void testAsyncRequestSemaphore() + throws InterruptedException, IOException { + LOG.info("Running testBasicAppendEntries"); + waitForLeader(cluster); + + int numMessages = 100; + CompletableFuture[] futures = new CompletableFuture[numMessages + 1]; + final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages); + final RaftClient client = cluster.createClient(); + //Set blockTransaction flag so that transaction blocks + for (RaftServerProxy server : cluster.getServers()) { + ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(true); + } + + //Send numMessages which are blocked and do not release the client semaphore permits + AtomicInteger blockedRequestsCount = new AtomicInteger(); + for (int i=0; i<numMessages; i++) { + blockedRequestsCount.getAndIncrement(); + futures[i] = client.sendAsync(messages[i]); + blockedRequestsCount.decrementAndGet(); + } + Assert.assertTrue(blockedRequestsCount.get() == 0); + + ExecutorService threadPool = Executors.newFixedThreadPool(1); + futures[numMessages] = CompletableFuture.supplyAsync(() -> { + blockedRequestsCount.incrementAndGet(); + client.sendAsync(new RaftTestUtil.SimpleMessage("n1")); + blockedRequestsCount.decrementAndGet(); + return null; + }, threadPool); + + //Allow the last msg to be sent + while (blockedRequestsCount.get() != 1) { + Thread.sleep(1000); + } + Assert.assertTrue(blockedRequestsCount.get() == 1); + //Since all semaphore permits are acquired the last message sent is in queue + RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1); + + //Unset the blockTransaction flag so that semaphore permits can be released + for (RaftServerProxy server : cluster.getServers()) { + ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(false); + } + for(int i=0; i<=numMessages; i++){ + futures[i].join(); + } + Assert.assertTrue(blockedRequestsCount.get() == 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java new file mode 100644 index 0000000..7741680 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftAsyncWithSimulatedRpc.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.RaftAsyncTests; + +public class TestRaftAsyncWithSimulatedRpc extends RaftAsyncTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a7b29658/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index a6e6672..d041fcf 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; /** * A {@link StateMachine} implementation example that simply stores all the log @@ -77,6 +78,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); private volatile boolean running = true; + private boolean blockTransaction = false; + private final Semaphore blockingSemaphore = new Semaphore(1); private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; SimpleStateMachine4Testing() { @@ -225,6 +228,16 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public TransactionContext startTransaction(RaftClientRequest request) throws IOException { + if (blockTransaction) { + try { + //blocks until blockTransaction is set to false + blockingSemaphore.acquire(); + blockingSemaphore.release(); + } catch (InterruptedException e) { + LOG.error("Could not block applyTransaction", e); + Thread.currentThread().interrupt(); + } + } return new TransactionContext(this, request, SMLogEntryProto.newBuilder() .setData(request.getMessage().getContent()) .build()); @@ -246,4 +259,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { public LogEntryProto[] getContent() { return list.toArray(new LogEntryProto[list.size()]); } + + public void setBlockTransaction(boolean blockTransactionVal) throws InterruptedException { + this.blockTransaction = blockTransactionVal; + if (blockTransactionVal) { + blockingSemaphore.acquire(); + } else { + blockingSemaphore.release(); + } + } }
