This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ad6d7c3 RATIS-728. TimeoutScheduler for GrpcLogAppender holds on to
the AppendEntryRequest till it times out even though request succeeds.
Contributed by Tsz-wo Sze.
ad6d7c3 is described below
commit ad6d7c3ea63e6b728ae5121582d02e2709311e91
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Thu Oct 24 15:40:50 2019 +0530
RATIS-728. TimeoutScheduler for GrpcLogAppender holds on to the
AppendEntryRequest till it times out even though request succeeds. Contributed
by Tsz-wo Sze.
---
.../grpc/client/GrpcClientProtocolClient.java | 13 ++--
.../apache/ratis/grpc/server/GrpcLogAppender.java | 86 ++++++++++++++--------
2 files changed, 61 insertions(+), 38 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index d6b9e13..be99bb5 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -302,7 +302,8 @@ public class GrpcClientProtocolClient implements Closeable {
}
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
- final CompletableFuture<RaftClientReply> f =
replies.putNew(request.getCallId());
+ final long callId = request.getCallId();
+ final CompletableFuture<RaftClientReply> f = replies.putNew(callId);
if (f == null) {
return JavaUtils.completeExceptionally(new
AlreadyClosedException(getName() + " is closed."));
}
@@ -315,14 +316,14 @@ public class GrpcClientProtocolClient implements
Closeable {
return f;
}
- scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request),
LOG,
- () -> "Timeout check failed for client request: " + request);
+ scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(callId),
LOG,
+ () -> "Timeout check failed for client request #" + callId);
return f;
}
- private void timeoutCheck(RaftClientRequest request) {
- handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
- new TimeoutIOException("Request timeout " + requestTimeoutDuration +
": " + request)));
+ private void timeoutCheck(long callId) {
+ handleReplyFuture(callId, f -> f.completeExceptionally(
+ new TimeoutIOException("Request #" + callId + " timeout " +
requestTimeoutDuration)));
}
private void handleReplyFuture(long callId,
Consumer<CompletableFuture<RaftClientReply>> handler) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 4e1e865..0464ff4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -83,14 +83,16 @@ public class GrpcLogAppender extends LogAppender {
return rpcService.getProxies().getProxy(getFollowerId());
}
- private synchronized void resetClient(AppendEntriesRequestProto request) {
+ private synchronized void resetClient(AppendEntriesRequest request) {
rpcService.getProxies().resetProxy(getFollowerId());
appendLogRequestObserver = null;
firstResponseReceived = false;
// clear the pending requests queue and reset the next index of follower
- final long nextIndex = request != null && request.hasPreviousLog()?
- request.getPreviousLog().getIndex() + 1: follower.getMatchIndex() + 1;
+ final long nextIndex = 1 + Optional.ofNullable(request)
+ .map(AppendEntriesRequest::getPreviousLog)
+ .map(TermIndex::getIndex)
+ .orElseGet(follower::getMatchIndex);
pendingRequests.clear();
follower.decreaseNextIndex(nextIndex);
}
@@ -166,20 +168,21 @@ public class GrpcLogAppender extends LogAppender {
}
private void appendLog() throws IOException {
+ final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
final StreamObserver<AppendEntriesRequestProto> s;
synchronized (this) {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
- AppendEntriesRequestProto pending = createRequest(callId++);
+ pending = createRequest(callId++);
if (pending == null) {
return;
}
grpcServerMetrics.onRequestCreate();
request = new AppendEntriesRequest(pending,
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(getFollowerId().toString()));
- pendingRequests.put(pending.getServerRequest().getCallId(), request);
+ pendingRequests.put(request.getCallId(), request);
increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
appendLogRequestObserver = getClient().appendEntries(new
AppendLogResponseHandler());
@@ -188,26 +191,24 @@ public class GrpcLogAppender extends LogAppender {
}
if (isAppenderRunning()) {
- sendRequest(request, s);
+ sendRequest(request, pending, s);
}
}
- private void sendRequest(AppendEntriesRequest request,
StreamObserver<AppendEntriesRequestProto> s) {
+ private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto, StreamObserver<AppendEntriesRequestProto> s) {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
- server.getId(), null, request);
- AppendEntriesRequestProto requestProto = request.getRequestProto();
+ server.getId(), null, proto);
request.startRequestTimer();
- s.onNext(requestProto);
- scheduler.onTimeout(requestTimeoutDuration, () ->
timeoutAppendRequest(requestProto), LOG,
+ s.onNext(proto);
+ scheduler.onTimeout(requestTimeoutDuration, () ->
timeoutAppendRequest(request.getCallId()), LOG,
() -> "Timeout check failed for append entry request: " + request);
follower.updateLastRpcSendTime();
}
- private void timeoutAppendRequest(AppendEntriesRequestProto request) {
- AppendEntriesRequest pendingRequest =
pendingRequests.remove(request.getServerRequest().getCallId());
- if (pendingRequest != null) {
- LOG.warn( "{}: appendEntries Timeout, request={}", this,
- ServerProtoUtils.toString(pendingRequest.getRequestProto()));
+ private void timeoutAppendRequest(long callId) {
+ final AppendEntriesRequest pending = pendingRequests.remove(callId);
+ if (pending != null) {
+ LOG.warn( "{}: appendEntries Timeout, request={}", this, pending);
}
}
@@ -235,23 +236,22 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onNext(AppendEntriesReplyProto reply) {
final AppendEntriesRequest request =
pendingRequests.remove(reply.getServerReply().getCallId());
- AppendEntriesRequestProto requestProto = request.getRequestProto();
if (LOG.isDebugEnabled()) {
LOG.debug("{}: received {} reply {}, request={}",
this, firstResponseReceived? "a": "the first",
- ServerProtoUtils.toString(reply),
ServerProtoUtils.toString(requestProto));
+ ServerProtoUtils.toString(reply), request);
}
request.stopRequestTimer(); // Update completion time
try {
- onNextImpl(requestProto, reply);
+ onNextImpl(request, reply);
} catch(Throwable t) {
- LOG.error("Failed onNext request=" +
ServerProtoUtils.toString(requestProto)
+ LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerProtoUtils.toString(reply), t);
}
}
- private void onNextImpl(AppendEntriesRequestProto request,
AppendEntriesReplyProto reply) {
+ private void onNextImpl(AppendEntriesRequest request,
AppendEntriesReplyProto reply) {
// update the last rpc time
follower.updateLastRpcResponseTime();
@@ -300,7 +300,7 @@ public class GrpcLogAppender extends LogAppender {
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
long callId = GrpcUtil.getCallId(t);
- resetClient(pendingRequests.remove(callId).getRequestProto());
+ resetClient(pendingRequests.remove(callId));
}
@Override
@@ -315,10 +315,8 @@ public class GrpcLogAppender extends LogAppender {
}
}
- private boolean checkAndUpdateMatchIndex(AppendEntriesRequestProto request) {
- final int n = request.getEntriesCount();
- final long newMatchIndex = n == 0? request.getPreviousLog().getIndex():
request.getEntries(n - 1).getIndex();
- return follower.updateMatchIndex(newMatchIndex);
+ private boolean checkAndUpdateMatchIndex(AppendEntriesRequest request) {
+ return follower.updateMatchIndex(request.getNewMatchIndex());
}
private synchronized void updateNextIndex(long replyNextIndex) {
@@ -521,17 +519,36 @@ public class GrpcLogAppender extends LogAppender {
}
static class AppendEntriesRequest {
- private final AppendEntriesRequestProto requestProto;
private final Timer timer;
- private Timer.Context timerContext;
+ private volatile Timer.Context timerContext;
+
+ private final long callId;
+ private final TermIndex previousLog;
+ private final int entriesCount;
+
+ private final TermIndex lastEntry;
+
+ AppendEntriesRequest(AppendEntriesRequestProto proto, Timer timer) {
+ this.callId = proto.getServerRequest().getCallId();
+ this.previousLog = proto.hasPreviousLog()?
ServerProtoUtils.toTermIndex(proto.getPreviousLog()): null;
+ this.entriesCount = proto.getEntriesCount();
+ this.lastEntry = entriesCount > 0?
ServerProtoUtils.toTermIndex(proto.getEntries(entriesCount - 1)): null;
- AppendEntriesRequest(AppendEntriesRequestProto requestProto, Timer timer) {
- this.requestProto = requestProto;
this.timer = timer;
}
- AppendEntriesRequestProto getRequestProto() {
- return requestProto;
+ long getCallId() {
+ return callId;
+ }
+
+ TermIndex getPreviousLog() {
+ return previousLog;
+ }
+
+ long getNewMatchIndex() {
+ return lastEntry != null? lastEntry.getIndex()
+ : previousLog != null? previousLog.getIndex()
+ : 0;
}
void startRequestTimer() {
@@ -541,5 +558,10 @@ public class GrpcLogAppender extends LogAppender {
void stopRequestTimer() {
timerContext.stop();
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":cid=" + callId + ",entriesCount="
+ entriesCount + ",lastEntry=" + lastEntry;
+ }
}
}