Repository: incubator-ratis Updated Branches: refs/heads/master 5b4bc0e7d -> 7b3a9a6f5
RATIS-192. In gRPC, appendEntries replies were received out of order on the Leader. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7b3a9a6f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7b3a9a6f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7b3a9a6f Branch: refs/heads/master Commit: 7b3a9a6f5f8e8075727d84e3ddeae7b594eda89c Parents: 5b4bc0e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri Jan 19 18:20:56 2018 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri Jan 19 18:20:56 2018 +0800 ---------------------------------------------------------------------- .../ratis/grpc/server/RaftServerProtocolService.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7b3a9a6f/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java index a7a6990..1e499ae 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -27,6 +27,8 @@ import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { @@ -62,16 +64,27 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase public StreamObserver<AppendEntriesRequestProto> appendEntries( StreamObserver<AppendEntriesReplyProto> responseObserver) { return new StreamObserver<AppendEntriesRequestProto>() { + private final AtomicReference<CompletableFuture<Void>> previousOnNext = + new AtomicReference<>(CompletableFuture.completedFuture(null)); + @Override public void onNext(AppendEntriesRequestProto request) { + final CompletableFuture<Void> current = new CompletableFuture<>(); + final CompletableFuture<Void> previous = previousOnNext.getAndSet(current); try { - server.appendEntriesAsync(request).thenAccept(responseObserver::onNext); + server.appendEntriesAsync(request).thenCombine(previous, + (reply, v) -> { + responseObserver.onNext(reply); + current.complete(null); + return null; + }); } catch (Throwable e) { if (LOG.isDebugEnabled()) { LOG.debug("{} got exception when appendEntries {}: {}", getId(), ProtoUtils.toString(request.getServerRequest()), e); } responseObserver.onError(RaftGrpcUtil.wrapException(e)); + current.completeExceptionally(e); } }
