Repository: incubator-ratis Updated Branches: refs/heads/master 564e89ee4 -> 86e744c1f
RATIS-113. Add Async send interface to RaftClient. Contributed by Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/86e744c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/86e744c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/86e744c1 Branch: refs/heads/master Commit: 86e744c1f1d50a2b3530dfe568def4eb33aa6e05 Parents: 564e89e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Nov 14 13:28:21 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Nov 14 13:28:21 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 11 +++ .../org/apache/ratis/client/RaftClientRpc.java | 20 ++++- .../ratis/client/impl/ClientImplUtils.java | 3 - .../ratis/client/impl/RaftClientImpl.java | 65 +++++++++++++- .../apache/ratis/protocol/RaftClientReply.java | 8 ++ .../apache/ratis/grpc/client/GrpcClientRpc.java | 91 ++++++++++++-------- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 5 ++ .../java/org/apache/ratis/RaftBasicTests.java | 32 ++++++- .../java/org/apache/ratis/RaftTestUtil.java | 11 ++- .../simulation/TestRaftWithSimulatedRpc.java | 6 ++ 10 files changed, 204 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 6e1e5c1..4b152cb 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; /** A client who sends requests to a raft service. */ public interface RaftClient extends Closeable { @@ -42,6 +43,16 @@ public interface RaftClient extends Closeable { RaftClientRpc getClientRpc(); /** + * Async call to send the given message to the raft service. + * The message may change the state of the service. + * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead. + */ + CompletableFuture<RaftClientReply> sendAsync(Message message); + + /** Async call to send the given readonly message to the raft service. */ + CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message); + + /** * Send the given message to the raft service. * The message may change the state of the service. * For readonly messages, use {@link #sendReadOnly(Message)} instead. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java index 90e9570..310f9df 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -17,16 +17,30 @@ */ package org.apache.ratis.client; -import java.io.Closeable; -import java.io.IOException; - import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + /** The client side rpc of a raft service. */ public interface RaftClientRpc extends Closeable { + /** Async call to send a request. */ + default CompletableFuture<RaftClientReply> sendRequestAsync( + RaftClientRequest request) { + return CompletableFuture.supplyAsync(() -> { + try { + return sendRequest(request); + } catch (Exception e) { + throw new CompletionException(e); + } + }); + } + /** Send a request. */ RaftClientReply sendRequest(RaftClientRequest request) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 07b07b0..2ae2f35 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -22,11 +22,8 @@ import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; -import java.util.Collection; - /** Client utilities for internal use. */ public class ClientImplUtils { public static RaftClient newRaftClient( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 8a0ddef..ea2a3bc 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -19,7 +19,6 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.shaded.com.google.common.base.Predicates; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.TimeDuration; @@ -28,9 +27,9 @@ import org.apache.ratis.protocol.*; import java.io.IOException; import java.io.InterruptedIOException; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; /** A client who sends requests to a raft service. */ @@ -49,6 +48,8 @@ final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval) { @@ -69,6 +70,30 @@ final class RaftClientImpl implements RaftClient { } @Override + public CompletableFuture<RaftClientReply> sendAsync(Message message) { + return sendAsync(message, false); + } + + @Override + public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) { + return sendAsync(message, true); + } + + private CompletableFuture<RaftClientReply> sendAsync(Message message, + boolean readOnly) { + Objects.requireNonNull(message, "message == null"); + final long callId = nextCallId(); + return sendRequestWithRetryAsync( + () -> new RaftClientRequest(clientId, leaderId, groupId, callId, message, readOnly) + ).thenApplyAsync(reply -> { + if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) { + throw new CompletionException(reply.getException()); + } + return reply; + }); + } + + @Override public RaftClientReply send(Message message) throws IOException { return send(message, false); } @@ -124,6 +149,21 @@ final class RaftClientImpl implements RaftClient { peersInNewConf.filter(p -> !peers.contains(p))::iterator); } + private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync( + Supplier<RaftClientRequest> supplier) { + return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> { + final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); + if (reply == null) { + final TimeUnit unit = retryInterval.getUnit(); + scheduler.schedule(() -> sendRequestWithRetryAsync(supplier) + .thenApply(r -> f.complete(r)), retryInterval.toLong(unit), unit); + } else { + f.complete(reply); + } + return f; + }); + } + private RaftClientReply sendRequestWithRetry( Supplier<RaftClientRequest> supplier) throws InterruptedIOException, StateMachineException, GroupMismatchException { @@ -145,6 +185,27 @@ final class RaftClientImpl implements RaftClient { } } + private CompletableFuture<RaftClientReply> sendRequestAsync( + RaftClientRequest request) { + LOG.debug("{}: sendAsync {}", clientId, request); + return clientRpc.sendRequestAsync(request).thenApplyAsync(reply -> { + LOG.debug("{}: receive {}", clientId, reply); + if (reply != null && reply.isNotLeader()) { + handleNotLeaderException(request, reply.getNotLeaderException()); + return null; + } + return reply; + }).exceptionally(e -> { + final Throwable cause = e.getCause(); + if (cause instanceof RaftException) { + return new RaftClientReply(request, (RaftException) cause); + } else if (cause instanceof IOException) { + handleIOException(request, (IOException) cause, null); + } + return null; + }); + } + private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException, GroupMismatchException { LOG.debug("{}: send {}", clientId, request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 60dc6c1..ea59352 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -93,4 +93,12 @@ public class RaftClientReply extends RaftClientMessage { public boolean hasStateMachineException() { return exception instanceof StateMachineException; } + + public boolean hasGroupMismatchException(){ + return exception instanceof GroupMismatchException; + } + + public RaftException getException(){ + return exception; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 3084289..2b7de70 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -51,6 +51,20 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie } @Override + public CompletableFuture<RaftClientReply> sendRequestAsync( + RaftClientRequest request) { + final RaftPeerId serverId = request.getServerId(); + try { + return sendRequestAsync(request, getProxies().getProxy(serverId)); + } catch (IOException e) { + final CompletableFuture<RaftClientReply> replyFuture = + new CompletableFuture<>(); + replyFuture.completeExceptionally(e); + return replyFuture; + } + } + + @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); @@ -74,42 +88,11 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie throw new IOException("msg size:" + requestProto.getSerializedSize() + " exceeds maximum:" + maxMessageSize); } - CompletableFuture<RaftClientReplyProto> replyFuture = - new CompletableFuture<>(); - final StreamObserver<RaftClientRequestProto> requestObserver = - proxy.append(new StreamObserver<RaftClientReplyProto>() { - @Override - public void onNext(RaftClientReplyProto value) { - replyFuture.complete(value); - } - - @Override - public void onError(Throwable t) { - // This implementation is used as RaftClientRpc. Retry - // logic on Exception is in RaftClient. - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = IOUtils.asIOException(t); - } - replyFuture.completeExceptionally(e); - } - - @Override - public void onCompleted() { - if (!replyFuture.isDone()) { - replyFuture.completeExceptionally( - new IOException("No reply for request " + request)); - } - } - }); - requestObserver.onNext(requestProto); - requestObserver.onCompleted(); - + final CompletableFuture<RaftClientReply> replyFuture = + sendRequestAsync(request, proxy); // TODO: timeout support try { - return toRaftClientReply(replyFuture.get()); + return replyFuture.get(); } catch (InterruptedException e) { throw new InterruptedIOException( "Interrupted while waiting for response of request " + request); @@ -118,4 +101,44 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie } } } + + private CompletableFuture<RaftClientReply> sendRequestAsync( + RaftClientRequest request, RaftClientProtocolClient proxy) { + final RaftClientRequestProto requestProto = + toRaftClientRequestProto(request); + final CompletableFuture<RaftClientReplyProto> replyFuture = + new CompletableFuture<>(); + final StreamObserver<RaftClientRequestProto> requestObserver = + proxy.append(new StreamObserver<RaftClientReplyProto>() { + @Override + public void onNext(RaftClientReplyProto value) { + replyFuture.complete(value); + } + + @Override + public void onError(Throwable t) { + // This implementation is used as RaftClientRpc. Retry + // logic on Exception is in RaftClient. + final IOException e; + if (t instanceof StatusRuntimeException) { + e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); + } else { + e = IOUtils.asIOException(t); + } + replyFuture.completeExceptionally(e); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("No reply for request " + request)); + } + } + }); + requestObserver.onNext(requestProto); + requestObserver.onCompleted(); + + return replyFuture.thenApply(replyProto -> toRaftClientReply(replyProto)); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 76a64b3..2657bd1 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -56,6 +56,11 @@ public class TestRaftWithGrpc extends RaftBasicTests { BlockRequestHandlingInjection.getInstance().unblockAll(); } + @Test + public void testBasicAppendEntriesAsync() throws Exception { + super.testBasicAppendEntries(true); + } + @Override @Test public void testWithLoad() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 2647d8f..89c40d0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -112,6 +113,10 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testBasicAppendEntries() throws Exception { + testBasicAppendEntries(false); + } + + protected void testBasicAppendEntries(boolean async) throws Exception { LOG.info("Running testBasicAppendEntries"); final MiniRaftCluster cluster = getCluster(); RaftServerImpl leader = waitForLeader(cluster); @@ -121,9 +126,28 @@ public abstract class RaftBasicTests extends BaseTest { LOG.info(cluster.printServers()); final SimpleMessage[] messages = SimpleMessage.create(10); - try(final RaftClient client = cluster.createClient()) { + + try (final RaftClient client = cluster.createClient()) { + final AtomicInteger asyncReplyCount = new AtomicInteger(); + final CompletableFuture<Void> f = new CompletableFuture<>(); + for (SimpleMessage message : messages) { - client.send(message); + if (async) { + client.sendAsync(message).thenAcceptAsync(reply -> { + if (!reply.isSuccess()) { + f.completeExceptionally( + new AssertionError("Failed with reply " + reply)); + } else if (asyncReplyCount.incrementAndGet() == messages.length) { + f.complete(null); + } + }); + } else { + client.send(message); + } + } + if (async) { + f.join(); + Assert.assertEquals(messages.length, asyncReplyCount.get()); } } @@ -131,7 +155,7 @@ public abstract class RaftBasicTests extends BaseTest { LOG.info(cluster.printAllLogs()); cluster.getServerAliveStream().map(s -> s.getState().getLog()) - .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages)); + .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, messages)); } @Test @@ -171,7 +195,7 @@ public abstract class RaftBasicTests extends BaseTest { Assert.assertEquals(followerToSendLog.getId(), newLeaderId); cluster.getServerAliveStream().map(s -> s.getState().getLog()) - .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages)); + .forEach(log -> RaftTestUtil.assertLogEntries(log, false, term, messages)); LOG.info("terminating testOldLeaderCommit test"); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index d1e614c..c8dfc0d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -165,7 +165,7 @@ public interface RaftTestUtil { } } - static void assertLogEntries(RaftLog log, long expectedTerm, + static void assertLogEntries(RaftLog log, boolean async, long expectedTerm, SimpleMessage... expectedMessages) { final TermIndex[] termIndices = log.getEntries(1, Long.MAX_VALUE); @@ -189,6 +189,11 @@ public interface RaftTestUtil { } } + if (async) { + Collections.sort(entries, Comparator + .comparing(e -> e.getSmLogEntry().getData().toStringUtf8())); + } + long logIndex = 0; Assert.assertEquals(expectedMessages.length, entries.size()); for (int i = 0; i < expectedMessages.length; i++) { @@ -197,7 +202,9 @@ public interface RaftTestUtil { if (e.getTerm() > expectedTerm) { expectedTerm = e.getTerm(); } - Assert.assertTrue(e.getIndex() > logIndex); + if (!async) { + Assert.assertTrue(e.getIndex() > logIndex); + } logIndex = e.getIndex(); Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(), e.getSmLogEntry().getData().toByteArray()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java index c1136b7..5b7f13e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java @@ -22,6 +22,7 @@ import org.apache.ratis.RaftBasicTests; import org.apache.ratis.client.RaftClient; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.LogUtils; +import org.junit.Test; import java.io.IOException; @@ -42,4 +43,9 @@ public class TestRaftWithSimulatedRpc extends RaftBasicTests { public MiniRaftClusterWithSimulatedRpc getCluster() { return cluster; } + + @Test + public void testBasicAppendEntriesAsync() throws Exception { + super.testBasicAppendEntries(true); + } }
