This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c7028a  RATIS-518. Add request specific retry policy support.
4c7028a is described below

commit 4c7028a64e2e5253cf678d5cb02c9dca15adbfeb
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Apr 15 20:18:18 2019 +0800

    RATIS-518. Add request specific retry policy support.
---
 .../apache/ratis/client/impl/RaftClientImpl.java   |  31 +++---
 .../apache/ratis/client/impl/UnorderedAsync.java   |  11 +-
 .../org/apache/ratis/protocol/RaftClientReply.java |  12 +-
 .../apache/ratis/protocol/RaftClientRequest.java   |   2 +-
 .../ratis/protocol/RaftRetryFailureException.java  |   7 +-
 .../java/org/apache/ratis/retry/RetryPolicies.java | 123 ++++++++++++++-------
 .../java/org/apache/ratis/retry/RetryPolicy.java   |  16 +--
 .../test/java/org/apache/ratis/RaftAsyncTests.java |   7 +-
 .../java/org/apache/ratis/TestRetryPolicy.java     |  80 ++++++++++++--
 9 files changed, 199 insertions(+), 90 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f73e541..6450b19 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -315,8 +315,9 @@ final class RaftClientImpl implements RaftClient {
         return;
       }
       if (reply == null) {
-        LOG.debug("schedule* attempt #{} with policy {} for {}", 
pending.getAttemptCount(), retryPolicy, request);
-        scheduler.onTimeout(retryPolicy.getSleepTime(),
+        final int attempt = pending.getAttemptCount();
+        LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
+        scheduler.onTimeout(retryPolicy.getSleepTime(attempt, request),
             () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetryAsync),
             LOG, () -> "Failed* to retry " + request);
       } else {
@@ -334,11 +335,11 @@ final class RaftClientImpl implements RaftClient {
       if (reply != null) {
         return reply;
       }
-      if (!retryPolicy.shouldRetry(attemptCount)) {
+      if (!retryPolicy.shouldRetry(attemptCount, request)) {
         return null;
       }
       try {
-        retryPolicy.getSleepTime().sleep();
+        retryPolicy.getSleepTime(attemptCount, request).sleep();
       } catch (InterruptedException e) {
         throw new InterruptedIOException("retry policy=" + retryPolicy);
       }
@@ -350,12 +351,13 @@ final class RaftClientImpl implements RaftClient {
     LOG.debug("{}: send* {}", clientId, request);
     return clientRpc.sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive* {}", clientId, reply);
+      final RaftException replyException = reply != null? 
reply.getException(): null;
       reply = handleNotLeaderException(request, reply, true);
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
             request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetryAsync);
-      } else if (!retryPolicy.shouldRetry(attemptCount)) {
-        handleAsyncRetryFailure(request, attemptCount);
+      } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
+        handleAsyncRetryFailure(request, attemptCount, replyException);
       }
       return reply;
     }).exceptionally(e -> {
@@ -366,8 +368,8 @@ final class RaftClientImpl implements RaftClient {
       }
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        if (!retryPolicy.shouldRetry(attemptCount)) {
-          handleAsyncRetryFailure(request, attemptCount);
+        if (!retryPolicy.shouldRetry(attemptCount, request)) {
+          handleAsyncRetryFailure(request, attemptCount, e);
         } else {
           handleIOException(request, (IOException) e, null, true);
         }
@@ -378,15 +380,16 @@ final class RaftClientImpl implements RaftClient {
     });
   }
 
-  static RaftRetryFailureException newRaftRetryFailureException(
-      RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy) {
+  static Throwable noMoreRetries(RaftClientRequest request, int attemptCount, 
RetryPolicy policy, Throwable throwable) {
+    if (attemptCount == 1 && throwable != null) {
+      return throwable;
+    }
     return new RaftRetryFailureException(
-        "Failed " + request + " for " + (attemptCount-1) + " attempts with " + 
retryPolicy);
+        "Failed " + request + " for " + (attemptCount-1) + " attempts with " + 
policy, throwable);
   }
 
-  private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount) {
-    final RaftRetryFailureException rfe = 
newRaftRetryFailureException(request, attemptCount, retryPolicy);
-    failAllAsyncRequests(request, rfe);
+  private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount, Throwable throwable) {
+    failAllAsyncRequests(request, noMoreRetries(request, attemptCount, 
retryPolicy, throwable));
   }
 
   private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 73e485c..8a831dc 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
@@ -73,14 +74,16 @@ public interface UnorderedAsync {
     
client.getClientRpc().sendRequestAsyncUnordered(request).whenCompleteAsync((reply,
 e) -> {
       try {
         LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, 
reply);
+        final RaftException replyException = reply != null? 
reply.getException(): null;
         reply = client.handleNotLeaderException(request, reply, false);
         if (reply != null) {
           f.complete(reply);
           return;
         }
         final RetryPolicy retryPolicy = client.getRetryPolicy();
-        if (!retryPolicy.shouldRetry(attemptCount)) {
-          
f.completeExceptionally(RaftClientImpl.newRaftRetryFailureException(request, 
attemptCount, retryPolicy));
+        if (!retryPolicy.shouldRetry(attemptCount, request)) {
+          f.completeExceptionally(RaftClientImpl.noMoreRetries(
+              request, attemptCount, retryPolicy, replyException != null? 
replyException: e));
           return;
         }
 
@@ -107,8 +110,8 @@ public interface UnorderedAsync {
         }
 
         LOG.debug("schedule retry for attempt #{}, policy={}, request={}", 
attemptCount, retryPolicy, request);
-        client.getScheduler().onTimeout(retryPolicy.getSleepTime(), () -> 
sendRequestWithRetry(pending, client),
-            LOG, () -> clientId + ": Failed~ to retry " + request);
+        client.getScheduler().onTimeout(retryPolicy.getSleepTime(attemptCount, 
request),
+            () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + 
": Failed~ to retry " + request);
       } catch (Throwable t) {
         LOG.error(clientId + ": XXX Failed " + request, t);
         f.completeExceptionally(t);
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 826eeee..9539806 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -67,8 +67,8 @@ public class RaftClientReply extends RaftClientMessage {
       Preconditions.assertTrue(!success,
           () -> "Inconsistent parameters: success && exception != null: " + 
this);
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
-          NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class,
-          RaftRetryFailureException.class), () -> "Unexpected exception class: 
" + this);
+          NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class),
+          () -> "Unexpected exception class: " + this);
     }
   }
 
@@ -145,8 +145,8 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, StateMachineException.class);
   }
 
-  /** If this reply has {@link RaftRetryFailureException}, return it; 
otherwise return null. */
-  public RaftRetryFailureException getRetryFailureException() {
-    return JavaUtils.cast(exception, RaftRetryFailureException.class);
+  /** @return the exception, if there is any; otherwise, return null. */
+  public RaftException getException() {
+    return exception;
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 18253fa..ae4e9ae 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -158,7 +158,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   private final SlidingWindowEntry slidingWindowEntry;
 
-  RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId 
groupId, long callId, Type type) {
+  public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId 
groupId, long callId, Type type) {
     this(clientId, serverId, groupId, callId, null, type, null);
   }
 
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
index 690e96b..0294bb4 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,8 +21,7 @@ package org.apache.ratis.protocol;
  * Retry failure as per the retryPolicy defined.
  */
 public class RaftRetryFailureException extends RaftException {
-
-  public RaftRetryFailureException(String msg) {
-    super(msg);
+  public RaftRetryFailureException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index e5cdeaa..c171c27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -17,34 +17,36 @@
  */
 package org.apache.ratis.retry;
 
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Optional;
+
 /**
  * A collection of {@link RetryPolicy} implementations
  */
 public interface RetryPolicies {
-  /**
-   * Keep retrying forever with zero sleep.
-   */
+  /** For any requests, keep retrying forever with no sleep between attempts. 
*/
   static RetryPolicy retryForeverNoSleep() {
     return Constants.RETRY_FOREVER_NO_SLEEP;
   }
 
+  /** No retry. */
   static RetryPolicy noRetry() {
     return Constants.NO_RETRY;
   }
 
-  /**
-   * Keep retrying forever with fixed sleep.
-   */
-  static RetryPolicy retryForeverWithSleep(TimeDuration sleepTime) {
+  /** For any requests, keep retrying forever with a fixed sleep time between 
attempts. */
+  static RetryForeverWithSleep retryForeverWithSleep(TimeDuration sleepTime) {
     return new RetryForeverWithSleep(sleepTime);
   }
 
-  /**
-   * Keep trying a limited number of times, waiting a fixed time between 
attempts,
-   * and then fail by re-throwing the exception.
-   */
+  /** For any requests, keep retrying a limited number of attempts with a 
fixed sleep time between attempts. */
   static RetryLimited retryUpToMaximumCountWithFixedSleep(int maxAttempts, 
TimeDuration sleepTime) {
     return new RetryLimited(maxAttempts, sleepTime);
   }
@@ -58,7 +60,7 @@ public interface RetryPolicies {
     private RetryForeverNoSleep() {}
 
     @Override
-    public boolean shouldRetry(int attemptCount) {
+    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
       return true;
     }
 
@@ -72,7 +74,7 @@ public interface RetryPolicies {
     private NoRetry() {}
 
     @Override
-    public boolean shouldRetry(int attemptCount) {
+    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
       return false;
     }
 
@@ -82,52 +84,39 @@ public interface RetryPolicies {
     }
   }
 
+  /** For any requests, keep retrying forever with a fixed sleep time between 
attempts. */
   class RetryForeverWithSleep implements RetryPolicy {
     private final TimeDuration sleepTime;
 
-    private String myString;
-
-    RetryForeverWithSleep(TimeDuration sleepTime) {
-      if (sleepTime.isNegative()) {
-        throw new IllegalArgumentException(
-            "sleepTime = " + sleepTime.getDuration() + " < 0");
-      }
+    private RetryForeverWithSleep(TimeDuration sleepTime) {
+      Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime.getDuration() + " < 0");
       this.sleepTime = sleepTime;
     }
 
     @Override
-    public TimeDuration getSleepTime() {
+    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
       return sleepTime;
     }
 
     @Override
-    public boolean shouldRetry(int attemptCount) {
+    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
       return true;
     }
 
     @Override
     public String toString() {
-      if (myString == null) {
-        myString = getClass().getSimpleName() + "(sleepTime = " + sleepTime + 
")";
-      }
-      return myString;
+      return getClass().getSimpleName() + "(sleepTime = " + sleepTime + ")";
     }
   }
-  /**
-   * Retry up to maxAttempts.
-   * The actual sleep time of the n-th retry is f(n, sleepTime),
-   * where f is a function provided by the subclass implementation.
-   *
-   * The object of the subclasses should be immutable;
-   * otherwise, the subclass must override hashCode(), equals(..) and 
toString().
-   */
+
+  /** For any requests, keep retrying a limited number of attempts with a 
fixed sleep time between attempts. */
   class RetryLimited implements RetryPolicy {
     private final int maxAttempts;
     private final TimeDuration sleepTime;
 
     private String myString;
 
-    RetryLimited(int maxAttempts, TimeDuration sleepTime) {
+    private RetryLimited(int maxAttempts, TimeDuration sleepTime) {
       if (maxAttempts < 0) {
         throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" < 
0");
       }
@@ -141,8 +130,8 @@ public interface RetryPolicies {
     }
 
     @Override
-    public TimeDuration getSleepTime() {
-      return sleepTime;
+    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
+      return shouldRetry(attemptCount, request)? sleepTime: ZERO_MILLIS;
     }
 
     public int getMaxAttempts() {
@@ -150,8 +139,8 @@ public interface RetryPolicies {
     }
 
     @Override
-    public boolean shouldRetry(int attemptCount) {
-      return attemptCount <= maxAttempts;
+    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
+      return attemptCount < maxAttempts;
     }
 
     @Override
@@ -163,4 +152,60 @@ public interface RetryPolicies {
       return myString;
     }
   }
+
+  /**
+   * A {@link RaftClientRequest.Type} dependent {@link RetryPolicy}
+   * such that each type can be set to use an individual policy.
+   * When the policy is not set for a particular type,
+   * the {@link #retryForeverNoSleep()} policy is used as the default.
+   */
+  class RequestTypeDependentRetry implements RetryPolicy {
+    public static class Builder {
+      private final EnumMap<RaftClientRequestProto.TypeCase, RetryPolicy> map
+          = new EnumMap<>(RaftClientRequestProto.TypeCase.class);
+
+      /** Set the given policy for the given type. */
+      public Builder set(RaftClientRequestProto.TypeCase type, RetryPolicy 
policy) {
+        final RetryPolicy previous = map.put(type, policy);
+        Preconditions.assertNull(previous, () -> "The type " + type + " is 
already set to " + previous);
+        return this;
+      }
+
+      public RequestTypeDependentRetry build() {
+        return new RequestTypeDependentRetry(map);
+      }
+    }
+
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    private final Map<RaftClientRequestProto.TypeCase, RetryPolicy> map;
+
+    private RequestTypeDependentRetry(EnumMap<RaftClientRequestProto.TypeCase, 
RetryPolicy> map) {
+      this.map = Collections.unmodifiableMap(map);
+    }
+
+    @Override
+    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
+      return Optional.ofNullable(map.get(request.getType().getTypeCase()))
+          .orElse(retryForeverNoSleep())
+          .shouldRetry(attemptCount, request);
+    }
+
+    @Override
+    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
+      return Optional.ofNullable(map.get(request.getType().getTypeCase()))
+          .orElse(retryForeverNoSleep())
+          .getSleepTime(attemptCount, request);
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder b = new 
StringBuilder(getClass().getSimpleName()).append("{");
+      map.forEach((key, value) -> 
b.append(key).append("->").append(value).append(", "));
+      b.setLength(b.length() - 2);
+      return b.append("}").toString();
+    }
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index ba90435..162f760 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.retry;
 
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
@@ -28,18 +29,19 @@ public interface RetryPolicy {
   TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
 
   /**
-   * Determines whether it is supposed to retry the connection if the operation
-   * fails for some reason.
+   * Determines whether it is supposed to retry after the operation has failed.
    *
-   * @param attemptCount The number of times attempted so far
-   * @return true if it has to make another attempt, otherwise, false
+   * @param attemptCount The number of times attempted so far.
+   * @param request The failed request.
+   * @return true if it has to make another attempt; otherwise, return false.
    */
-  boolean shouldRetry(int attemptCount);
+  boolean shouldRetry(int attemptCount, RaftClientRequest request);
 
   /**
-   * Returns the time duration for sleep in between the retries.
+   * @param attemptCount The number of times attempted so far.
+   * @return the {@link TimeDuration} to sleep in between the retries.
    */
-  default TimeDuration getSleepTime() {
+  default TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
     return ZERO_MILLIS;
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3821f5f..aa9893a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -116,7 +116,8 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   }
 
   void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER 
cluster) throws Exception {
-    final RetryLimited retryPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, HUNDRED_MILLIS);
+    final TimeDuration sleepTime = HUNDRED_MILLIS;
+    final RetryLimited retryPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, sleepTime);
 
     try(final RaftClient client = cluster.createClient(null, retryPolicy)) {
       RaftPeerId leader = null;
@@ -147,7 +148,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
         }
 
         // sleep most of the retry time
-        retryPolicy.getSleepTime().apply(t -> t * 
(retryPolicy.getMaxAttempts() - 1)).sleep();
+        sleepTime.apply(t -> t * (retryPolicy.getMaxAttempts() - 1)).sleep();
 
         // send another half of the calls without starting the cluster
         for (; i < messages.length; i++) {
@@ -158,7 +159,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
       // sleep again so that the first half calls will fail retries.
       // the second half still have retry time remaining.
-      retryPolicy.getSleepTime().apply(t -> t*2).sleep();
+      sleepTime.apply(t -> t*2).sleep();
 
       if (leader != null) {
         cluster.restartServer(leader, false);
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java 
b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
index ff947d4..4049f6b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,26 +17,82 @@
  */
 package org.apache.ratis;
 
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
 
-
-
-public class TestRetryPolicy {
+public class TestRetryPolicy extends BaseTest {
 
   @Test
   public void testRetryMultipleTimesWithFixedSleep() {
-    RetryPolicy retryPolicy = RetryPolicies
-        .retryUpToMaximumCountWithFixedSleep(2,
-            TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS));
-     boolean shouldRetry = retryPolicy.shouldRetry(1);
-    Assert.assertTrue(shouldRetry);
-    Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration());
-    Assert.assertFalse(retryPolicy.shouldRetry(3));
+    final int n = 4;
+    final TimeDuration sleepTime = HUNDRED_MILLIS;
+    final RetryPolicy policy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, sleepTime);
+    final RaftClientRequest request = 
newRaftClientRequest(RaftClientRequest.readRequestType());
+    for(int i = 1; i < 2*n; i++) {
+      final boolean expected = i < n;
+      Assert.assertEquals(expected, policy.shouldRetry(i, request));
+      if (expected) {
+        Assert.assertEquals(sleepTime, policy.getSleepTime(i, request));
+      } else {
+        Assert.assertEquals(0L, policy.getSleepTime(i, request).getDuration());
+      }
+    }
+  }
+
+  @Test
+  public void testRequestTypeDependentRetry() {
+    final RetryPolicies.RequestTypeDependentRetry.Builder b = 
RetryPolicies.RequestTypeDependentRetry.newBuilder();
+    final int n = 4;
+    final TimeDuration writeSleep = HUNDRED_MILLIS;
+    final RetryPolicies.RetryLimited writePolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, writeSleep);
+    b.set(RaftClientRequestProto.TypeCase.WRITE, writePolicy);
+    b.set(RaftClientRequestProto.TypeCase.WATCH, RetryPolicies.noRetry());
+    final RetryPolicy policy = b.build();
+    LOG.info("policy = {}", policy);
+
+    final RaftClientRequest staleReadRequest = 
newRaftClientRequest(RaftClientRequest.staleReadRequestType(1));
+    final RaftClientRequest readRequest = 
newRaftClientRequest(RaftClientRequest.readRequestType());
+    final RaftClientRequest writeRequest = 
newRaftClientRequest(RaftClientRequest.writeRequestType());
+    final RaftClientRequest watchRequest = newRaftClientRequest(
+        RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
+    for(int i = 1; i < 2*n; i++) {
+      { //write
+        final boolean expected = i < n;
+        Assert.assertEquals(expected, policy.shouldRetry(i, writeRequest));
+        if (expected) {
+          Assert.assertEquals(writeSleep, policy.getSleepTime(i, 
writeRequest));
+        } else {
+          Assert.assertEquals(0L, policy.getSleepTime(i, 
writeRequest).getDuration());
+        }
+      }
+
+      { //read and stale read are using default
+        Assert.assertTrue(policy.shouldRetry(i, readRequest));
+        Assert.assertEquals(0L, policy.getSleepTime(i, 
readRequest).getDuration());
+
+        Assert.assertTrue(policy.shouldRetry(i, staleReadRequest));
+        Assert.assertEquals(0L, policy.getSleepTime(i, 
staleReadRequest).getDuration());
+      }
+
+      { //watch has no retry
+        Assert.assertFalse(policy.shouldRetry(i, watchRequest));
+        Assert.assertEquals(0L, policy.getSleepTime(i, 
watchRequest).getDuration());
+      }
+    }
+
+  }
+
+  private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type 
type) {
+    return new RaftClientRequest(ClientId.randomId(), 
RaftPeerId.valueOf("s0"), RaftGroupId.randomId(), 1L, type);
   }
 }
\ No newline at end of file

Reply via email to