This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 55cbfbb  RATIS-726. TimeoutScheduler holds on to the raftClientRequest 
till it times out even though request succeeds. Contributed by Tsz-wo Sze.
55cbfbb is described below

commit 55cbfbbca68aca531bc261786a34499fce4de700
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Wed Oct 23 17:21:00 2019 +0530

    RATIS-726. TimeoutScheduler holds on to the raftClientRequest till it times 
out even though request succeeds. Contributed by Tsz-wo Sze.
---
 .../org/apache/ratis/client/impl/OrderedAsync.java | 49 ++++++++++++++--------
 1 file changed, 31 insertions(+), 18 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index efd26a1..7694450 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -36,43 +36,45 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongFunction;
 
 /** Send ordered asynchronous requests to a raft service. */
 class OrderedAsync {
-  static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(OrderedAsync.class);
 
   static class PendingOrderedRequest extends PendingClientRequest
       implements SlidingWindow.ClientSideRequest<RaftClientReply> {
-    private final Function<SlidingWindowEntry, RaftClientRequest> 
requestConstructor;
+    private final long callId;
     private final long seqNum;
+    private final AtomicReference<Function<SlidingWindowEntry, 
RaftClientRequest>> requestConstructor;
     private volatile boolean isFirst = false;
-    private volatile RaftClientRequest request;
 
-    PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry, 
RaftClientRequest> requestConstructor) {
+    PendingOrderedRequest(long callId, long seqNum,
+        Function<SlidingWindowEntry, RaftClientRequest> requestConstructor) {
+      this.callId = callId;
       this.seqNum = seqNum;
-      this.requestConstructor = requestConstructor;
+      this.requestConstructor = new AtomicReference<>(requestConstructor);
     }
 
     @Override
     RaftClientRequest newRequestImpl() {
-      request = 
requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
-      return request;
-    }
-
-    RaftClientRequest getRequest() {
-      return request;
+      return Optional.ofNullable(requestConstructor.get())
+          .map(f -> f.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst)))
+          .orElse(null);
     }
 
     @Override
@@ -92,17 +94,19 @@ class OrderedAsync {
 
     @Override
     public void setReply(RaftClientReply reply) {
+      requestConstructor.set(null);
       getReplyFuture().complete(reply);
     }
 
     @Override
     public void fail(Throwable e) {
+      requestConstructor.set(null);
       getReplyFuture().completeExceptionally(e);
     }
 
     @Override
     public String toString() {
-      return "[seq=" + getSeqNum() + "]";
+      return "[cid=" + callId + ", seq=" + getSeqNum() + "]";
     }
   }
 
@@ -150,7 +154,7 @@ class OrderedAsync {
     }
 
     final long callId = RaftClientImpl.nextCallId();
-    final LongFunction<PendingOrderedRequest> constructor = seqNum -> new 
PendingOrderedRequest(seqNum,
+    final LongFunction<PendingOrderedRequest> constructor = seqNum -> new 
PendingOrderedRequest(callId, seqNum,
         slidingWindowEntry -> client.newRaftClientRequest(server, callId, 
message, type, slidingWindowEntry));
     return getSlidingWindow(server).submitNewRequest(constructor, 
this::sendRequestWithRetry
     ).getReplyFuture(
@@ -159,13 +163,17 @@ class OrderedAsync {
   }
 
   private void sendRequestWithRetry(PendingOrderedRequest pending) {
-    final RetryPolicy retryPolicy = client.getRetryPolicy();
     final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
     if (f.isDone()) {
       return;
     }
 
-    RaftClientRequest request = pending.newRequestImpl();
+    final RaftClientRequest request = pending.newRequestImpl();
+    if (request == null) { // already done
+      return;
+    }
+
+    final RetryPolicy retryPolicy = client.getRetryPolicy();
     sendRequest(pending).thenAccept(reply -> {
       if (f.isDone()) {
         return;
@@ -191,9 +199,14 @@ class OrderedAsync {
   private void scheduleWithTimeout(PendingOrderedRequest pending, 
RaftClientRequest request, RetryPolicy retryPolicy) {
     final int attempt = pending.getAttemptCount();
     LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
-    client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, request),
-        () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetry),
-        LOG, () -> "Failed* to retry " + request);
+    final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
+    scheduleWithTimeout(pending, request.getServerId(), sleepTime);
+  }
+
+  private void scheduleWithTimeout(PendingOrderedRequest pending, RaftPeerId 
serverId, TimeDuration sleepTime) {
+    client.getScheduler().onTimeout(sleepTime,
+        () -> getSlidingWindow(serverId).retry(pending, 
this::sendRequestWithRetry),
+        LOG, () -> "Failed* to retry " + pending);
   }
 
   private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest 
pending) {

Reply via email to