This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ad6d7c3  RATIS-728. TimeoutScheduler for GrpcLogAppender holds on to 
the AppendEntryRequest till it times out even though request succeeds. 
Contributed by Tsz-wo Sze.
ad6d7c3 is described below

commit ad6d7c3ea63e6b728ae5121582d02e2709311e91
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Thu Oct 24 15:40:50 2019 +0530

    RATIS-728. TimeoutScheduler for GrpcLogAppender holds on to the 
AppendEntryRequest till it times out even though request succeeds. Contributed 
by Tsz-wo Sze.
---
 .../grpc/client/GrpcClientProtocolClient.java      | 13 ++--
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 86 ++++++++++++++--------
 2 files changed, 61 insertions(+), 38 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index d6b9e13..be99bb5 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -302,7 +302,8 @@ public class GrpcClientProtocolClient implements Closeable {
     }
 
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
-      final CompletableFuture<RaftClientReply> f = 
replies.putNew(request.getCallId());
+      final long callId = request.getCallId();
+      final CompletableFuture<RaftClientReply> f = replies.putNew(callId);
       if (f == null) {
         return JavaUtils.completeExceptionally(new 
AlreadyClosedException(getName() + " is closed."));
       }
@@ -315,14 +316,14 @@ public class GrpcClientProtocolClient implements 
Closeable {
         return f;
       }
 
-      scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), 
LOG,
-          () -> "Timeout check failed for client request: " + request);
+      scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(callId), 
LOG,
+          () -> "Timeout check failed for client request #" + callId);
       return f;
     }
 
-    private void timeoutCheck(RaftClientRequest request) {
-      handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
-          new TimeoutIOException("Request timeout " + requestTimeoutDuration + 
": " + request)));
+    private void timeoutCheck(long callId) {
+      handleReplyFuture(callId, f -> f.completeExceptionally(
+          new TimeoutIOException("Request #" + callId + " timeout " + 
requestTimeoutDuration)));
     }
 
     private void handleReplyFuture(long callId, 
Consumer<CompletableFuture<RaftClientReply>> handler) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 4e1e865..0464ff4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -83,14 +83,16 @@ public class GrpcLogAppender extends LogAppender {
     return rpcService.getProxies().getProxy(getFollowerId());
   }
 
-  private synchronized void resetClient(AppendEntriesRequestProto request) {
+  private synchronized void resetClient(AppendEntriesRequest request) {
     rpcService.getProxies().resetProxy(getFollowerId());
     appendLogRequestObserver = null;
     firstResponseReceived = false;
 
     // clear the pending requests queue and reset the next index of follower
-    final long nextIndex = request != null && request.hasPreviousLog()?
-        request.getPreviousLog().getIndex() + 1: follower.getMatchIndex() + 1;
+    final long nextIndex = 1 + Optional.ofNullable(request)
+        .map(AppendEntriesRequest::getPreviousLog)
+        .map(TermIndex::getIndex)
+        .orElseGet(follower::getMatchIndex);
     pendingRequests.clear();
     follower.decreaseNextIndex(nextIndex);
   }
@@ -166,20 +168,21 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   private void appendLog() throws IOException {
+    final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
     final StreamObserver<AppendEntriesRequestProto> s;
     synchronized (this) {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      AppendEntriesRequestProto pending = createRequest(callId++);
+      pending = createRequest(callId++);
       if (pending == null) {
         return;
       }
       grpcServerMetrics.onRequestCreate();
       request = new AppendEntriesRequest(pending,
           
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(getFollowerId().toString()));
-      pendingRequests.put(pending.getServerRequest().getCallId(), request);
+      pendingRequests.put(request.getCallId(), request);
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
@@ -188,26 +191,24 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     if (isAppenderRunning()) {
-      sendRequest(request, s);
+      sendRequest(request, pending, s);
     }
   }
 
-  private void sendRequest(AppendEntriesRequest request, 
StreamObserver<AppendEntriesRequestProto> s) {
+  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto, StreamObserver<AppendEntriesRequestProto> s) {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
-        server.getId(), null, request);
-    AppendEntriesRequestProto requestProto = request.getRequestProto();
+        server.getId(), null, proto);
     request.startRequestTimer();
-    s.onNext(requestProto);
-    scheduler.onTimeout(requestTimeoutDuration, () -> 
timeoutAppendRequest(requestProto), LOG,
+    s.onNext(proto);
+    scheduler.onTimeout(requestTimeoutDuration, () -> 
timeoutAppendRequest(request.getCallId()), LOG,
         () -> "Timeout check failed for append entry request: " + request);
     follower.updateLastRpcSendTime();
   }
 
-  private void timeoutAppendRequest(AppendEntriesRequestProto request) {
-    AppendEntriesRequest pendingRequest = 
pendingRequests.remove(request.getServerRequest().getCallId());
-    if (pendingRequest != null) {
-      LOG.warn( "{}: appendEntries Timeout, request={}", this,
-          ServerProtoUtils.toString(pendingRequest.getRequestProto()));
+  private void timeoutAppendRequest(long callId) {
+    final AppendEntriesRequest pending = pendingRequests.remove(callId);
+    if (pending != null) {
+      LOG.warn( "{}: appendEntries Timeout, request={}", this, pending);
     }
   }
 
@@ -235,23 +236,22 @@ public class GrpcLogAppender extends LogAppender {
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
       final AppendEntriesRequest request = 
pendingRequests.remove(reply.getServerReply().getCallId());
-      AppendEntriesRequestProto requestProto = request.getRequestProto();
       if (LOG.isDebugEnabled()) {
         LOG.debug("{}: received {} reply {}, request={}",
             this, firstResponseReceived? "a": "the first",
-            ServerProtoUtils.toString(reply), 
ServerProtoUtils.toString(requestProto));
+            ServerProtoUtils.toString(reply), request);
       }
       request.stopRequestTimer(); // Update completion time
 
       try {
-        onNextImpl(requestProto, reply);
+        onNextImpl(request, reply);
       } catch(Throwable t) {
-        LOG.error("Failed onNext request=" + 
ServerProtoUtils.toString(requestProto)
+        LOG.error("Failed onNext request=" + request
             + ", reply=" + ServerProtoUtils.toString(reply), t);
       }
     }
 
-    private void onNextImpl(AppendEntriesRequestProto request, 
AppendEntriesReplyProto reply) {
+    private void onNextImpl(AppendEntriesRequest request, 
AppendEntriesReplyProto reply) {
       // update the last rpc time
       follower.updateLastRpcResponseTime();
 
@@ -300,7 +300,7 @@ public class GrpcLogAppender extends LogAppender {
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
       long callId = GrpcUtil.getCallId(t);
-      resetClient(pendingRequests.remove(callId).getRequestProto());
+      resetClient(pendingRequests.remove(callId));
     }
 
     @Override
@@ -315,10 +315,8 @@ public class GrpcLogAppender extends LogAppender {
     }
   }
 
-  private boolean checkAndUpdateMatchIndex(AppendEntriesRequestProto request) {
-    final int n = request.getEntriesCount();
-    final long newMatchIndex = n == 0? request.getPreviousLog().getIndex(): 
request.getEntries(n - 1).getIndex();
-    return follower.updateMatchIndex(newMatchIndex);
+  private boolean checkAndUpdateMatchIndex(AppendEntriesRequest request) {
+    return follower.updateMatchIndex(request.getNewMatchIndex());
   }
 
   private synchronized void updateNextIndex(long replyNextIndex) {
@@ -521,17 +519,36 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   static class AppendEntriesRequest {
-    private final AppendEntriesRequestProto requestProto;
     private final Timer timer;
-    private Timer.Context timerContext;
+    private volatile Timer.Context timerContext;
+
+    private final long callId;
+    private final TermIndex previousLog;
+    private final int entriesCount;
+
+    private final TermIndex lastEntry;
+
+    AppendEntriesRequest(AppendEntriesRequestProto proto, Timer timer) {
+      this.callId = proto.getServerRequest().getCallId();
+      this.previousLog = proto.hasPreviousLog()? 
ServerProtoUtils.toTermIndex(proto.getPreviousLog()): null;
+      this.entriesCount = proto.getEntriesCount();
+      this.lastEntry = entriesCount > 0? 
ServerProtoUtils.toTermIndex(proto.getEntries(entriesCount - 1)): null;
 
-    AppendEntriesRequest(AppendEntriesRequestProto requestProto, Timer timer) {
-      this.requestProto = requestProto;
       this.timer = timer;
     }
 
-    AppendEntriesRequestProto getRequestProto() {
-      return requestProto;
+    long getCallId() {
+      return callId;
+    }
+
+    TermIndex getPreviousLog() {
+      return previousLog;
+    }
+
+    long getNewMatchIndex() {
+      return lastEntry != null? lastEntry.getIndex()
+          : previousLog != null? previousLog.getIndex()
+          : 0;
     }
 
     void startRequestTimer() {
@@ -541,5 +558,10 @@ public class GrpcLogAppender extends LogAppender {
     void stopRequestTimer() {
       timerContext.stop();
     }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + ":cid=" + callId + ",entriesCount=" 
+ entriesCount + ",lastEntry=" + lastEntry;
+    }
   }
 }

Reply via email to