This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4c7028a RATIS-518. Add request specific retry policy support.
4c7028a is described below
commit 4c7028a64e2e5253cf678d5cb02c9dca15adbfeb
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Apr 15 20:18:18 2019 +0800
RATIS-518. Add request specific retry policy support.
---
.../apache/ratis/client/impl/RaftClientImpl.java | 31 +++---
.../apache/ratis/client/impl/UnorderedAsync.java | 11 +-
.../org/apache/ratis/protocol/RaftClientReply.java | 12 +-
.../apache/ratis/protocol/RaftClientRequest.java | 2 +-
.../ratis/protocol/RaftRetryFailureException.java | 7 +-
.../java/org/apache/ratis/retry/RetryPolicies.java | 123 ++++++++++++++-------
.../java/org/apache/ratis/retry/RetryPolicy.java | 16 +--
.../test/java/org/apache/ratis/RaftAsyncTests.java | 7 +-
.../java/org/apache/ratis/TestRetryPolicy.java | 80 ++++++++++++--
9 files changed, 199 insertions(+), 90 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f73e541..6450b19 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -315,8 +315,9 @@ final class RaftClientImpl implements RaftClient {
return;
}
if (reply == null) {
- LOG.debug("schedule* attempt #{} with policy {} for {}",
pending.getAttemptCount(), retryPolicy, request);
- scheduler.onTimeout(retryPolicy.getSleepTime(),
+ final int attempt = pending.getAttemptCount();
+ LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
+ scheduler.onTimeout(retryPolicy.getSleepTime(attempt, request),
() -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetryAsync),
LOG, () -> "Failed* to retry " + request);
} else {
@@ -334,11 +335,11 @@ final class RaftClientImpl implements RaftClient {
if (reply != null) {
return reply;
}
- if (!retryPolicy.shouldRetry(attemptCount)) {
+ if (!retryPolicy.shouldRetry(attemptCount, request)) {
return null;
}
try {
- retryPolicy.getSleepTime().sleep();
+ retryPolicy.getSleepTime(attemptCount, request).sleep();
} catch (InterruptedException e) {
throw new InterruptedIOException("retry policy=" + retryPolicy);
}
@@ -350,12 +351,13 @@ final class RaftClientImpl implements RaftClient {
LOG.debug("{}: send* {}", clientId, request);
return clientRpc.sendRequestAsync(request).thenApply(reply -> {
LOG.debug("{}: receive* {}", clientId, reply);
+ final RaftException replyException = reply != null?
reply.getException(): null;
reply = handleNotLeaderException(request, reply, true);
if (reply != null) {
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetryAsync);
- } else if (!retryPolicy.shouldRetry(attemptCount)) {
- handleAsyncRetryFailure(request, attemptCount);
+ } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
+ handleAsyncRetryFailure(request, attemptCount, replyException);
}
return reply;
}).exceptionally(e -> {
@@ -366,8 +368,8 @@ final class RaftClientImpl implements RaftClient {
}
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- if (!retryPolicy.shouldRetry(attemptCount)) {
- handleAsyncRetryFailure(request, attemptCount);
+ if (!retryPolicy.shouldRetry(attemptCount, request)) {
+ handleAsyncRetryFailure(request, attemptCount, e);
} else {
handleIOException(request, (IOException) e, null, true);
}
@@ -378,15 +380,16 @@ final class RaftClientImpl implements RaftClient {
});
}
- static RaftRetryFailureException newRaftRetryFailureException(
- RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy) {
+ static Throwable noMoreRetries(RaftClientRequest request, int attemptCount,
RetryPolicy policy, Throwable throwable) {
+ if (attemptCount == 1 && throwable != null) {
+ return throwable;
+ }
return new RaftRetryFailureException(
- "Failed " + request + " for " + (attemptCount-1) + " attempts with " +
retryPolicy);
+ "Failed " + request + " for " + (attemptCount-1) + " attempts with " +
policy, throwable);
}
- private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount) {
- final RaftRetryFailureException rfe =
newRaftRetryFailureException(request, attemptCount, retryPolicy);
- failAllAsyncRequests(request, rfe);
+ private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount, Throwable throwable) {
+ failAllAsyncRequests(request, noMoreRetries(request, attemptCount,
retryPolicy, throwable));
}
private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 73e485c..8a831dc 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
@@ -73,14 +74,16 @@ public interface UnorderedAsync {
client.getClientRpc().sendRequestAsyncUnordered(request).whenCompleteAsync((reply,
e) -> {
try {
LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount,
reply);
+ final RaftException replyException = reply != null?
reply.getException(): null;
reply = client.handleNotLeaderException(request, reply, false);
if (reply != null) {
f.complete(reply);
return;
}
final RetryPolicy retryPolicy = client.getRetryPolicy();
- if (!retryPolicy.shouldRetry(attemptCount)) {
-
f.completeExceptionally(RaftClientImpl.newRaftRetryFailureException(request,
attemptCount, retryPolicy));
+ if (!retryPolicy.shouldRetry(attemptCount, request)) {
+ f.completeExceptionally(RaftClientImpl.noMoreRetries(
+ request, attemptCount, retryPolicy, replyException != null?
replyException: e));
return;
}
@@ -107,8 +110,8 @@ public interface UnorderedAsync {
}
LOG.debug("schedule retry for attempt #{}, policy={}, request={}",
attemptCount, retryPolicy, request);
- client.getScheduler().onTimeout(retryPolicy.getSleepTime(), () ->
sendRequestWithRetry(pending, client),
- LOG, () -> clientId + ": Failed~ to retry " + request);
+ client.getScheduler().onTimeout(retryPolicy.getSleepTime(attemptCount,
request),
+ () -> sendRequestWithRetry(pending, client), LOG, () -> clientId +
": Failed~ to retry " + request);
} catch (Throwable t) {
LOG.error(clientId + ": XXX Failed " + request, t);
f.completeExceptionally(t);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 826eeee..9539806 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -67,8 +67,8 @@ public class RaftClientReply extends RaftClientMessage {
Preconditions.assertTrue(!success,
() -> "Inconsistent parameters: success && exception != null: " +
this);
Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
- NotLeaderException.class, NotReplicatedException.class,
StateMachineException.class,
- RaftRetryFailureException.class), () -> "Unexpected exception class:
" + this);
+ NotLeaderException.class, NotReplicatedException.class,
StateMachineException.class),
+ () -> "Unexpected exception class: " + this);
}
}
@@ -145,8 +145,8 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, StateMachineException.class);
}
- /** If this reply has {@link RaftRetryFailureException}, return it;
otherwise return null. */
- public RaftRetryFailureException getRetryFailureException() {
- return JavaUtils.cast(exception, RaftRetryFailureException.class);
+ /** @return the exception, if there is any; otherwise, return null. */
+ public RaftException getException() {
+ return exception;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 18253fa..ae4e9ae 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -158,7 +158,7 @@ public class RaftClientRequest extends RaftClientMessage {
private final SlidingWindowEntry slidingWindowEntry;
- RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId, long callId, Type type) {
+ public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId, long callId, Type type) {
this(clientId, serverId, groupId, callId, null, type, null);
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
index 690e96b..0294bb4 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,8 +21,7 @@ package org.apache.ratis.protocol;
* Retry failure as per the retryPolicy defined.
*/
public class RaftRetryFailureException extends RaftException {
-
- public RaftRetryFailureException(String msg) {
- super(msg);
+ public RaftRetryFailureException(String message, Throwable cause) {
+ super(message, cause);
}
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index e5cdeaa..c171c27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -17,34 +17,36 @@
*/
package org.apache.ratis.retry;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Optional;
+
/**
* A collection of {@link RetryPolicy} implementations
*/
public interface RetryPolicies {
- /**
- * Keep retrying forever with zero sleep.
- */
+ /** For any requests, keep retrying forever with no sleep between attempts.
*/
static RetryPolicy retryForeverNoSleep() {
return Constants.RETRY_FOREVER_NO_SLEEP;
}
+ /** No retry. */
static RetryPolicy noRetry() {
return Constants.NO_RETRY;
}
- /**
- * Keep retrying forever with fixed sleep.
- */
- static RetryPolicy retryForeverWithSleep(TimeDuration sleepTime) {
+ /** For any requests, keep retrying forever with a fixed sleep time between
attempts. */
+ static RetryForeverWithSleep retryForeverWithSleep(TimeDuration sleepTime) {
return new RetryForeverWithSleep(sleepTime);
}
- /**
- * Keep trying a limited number of times, waiting a fixed time between
attempts,
- * and then fail by re-throwing the exception.
- */
+ /** For any requests, keep retrying a limited number of attempts with a
fixed sleep time between attempts. */
static RetryLimited retryUpToMaximumCountWithFixedSleep(int maxAttempts,
TimeDuration sleepTime) {
return new RetryLimited(maxAttempts, sleepTime);
}
@@ -58,7 +60,7 @@ public interface RetryPolicies {
private RetryForeverNoSleep() {}
@Override
- public boolean shouldRetry(int attemptCount) {
+ public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
return true;
}
@@ -72,7 +74,7 @@ public interface RetryPolicies {
private NoRetry() {}
@Override
- public boolean shouldRetry(int attemptCount) {
+ public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
return false;
}
@@ -82,52 +84,39 @@ public interface RetryPolicies {
}
}
+ /** For any requests, keep retrying forever with a fixed sleep time between
attempts. */
class RetryForeverWithSleep implements RetryPolicy {
private final TimeDuration sleepTime;
- private String myString;
-
- RetryForeverWithSleep(TimeDuration sleepTime) {
- if (sleepTime.isNegative()) {
- throw new IllegalArgumentException(
- "sleepTime = " + sleepTime.getDuration() + " < 0");
- }
+ private RetryForeverWithSleep(TimeDuration sleepTime) {
+ Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime.getDuration() + " < 0");
this.sleepTime = sleepTime;
}
@Override
- public TimeDuration getSleepTime() {
+ public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
return sleepTime;
}
@Override
- public boolean shouldRetry(int attemptCount) {
+ public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
return true;
}
@Override
public String toString() {
- if (myString == null) {
- myString = getClass().getSimpleName() + "(sleepTime = " + sleepTime +
")";
- }
- return myString;
+ return getClass().getSimpleName() + "(sleepTime = " + sleepTime + ")";
}
}
- /**
- * Retry up to maxAttempts.
- * The actual sleep time of the n-th retry is f(n, sleepTime),
- * where f is a function provided by the subclass implementation.
- *
- * The object of the subclasses should be immutable;
- * otherwise, the subclass must override hashCode(), equals(..) and
toString().
- */
+
+ /** For any requests, keep retrying a limited number of attempts with a
fixed sleep time between attempts. */
class RetryLimited implements RetryPolicy {
private final int maxAttempts;
private final TimeDuration sleepTime;
private String myString;
- RetryLimited(int maxAttempts, TimeDuration sleepTime) {
+ private RetryLimited(int maxAttempts, TimeDuration sleepTime) {
if (maxAttempts < 0) {
throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" <
0");
}
@@ -141,8 +130,8 @@ public interface RetryPolicies {
}
@Override
- public TimeDuration getSleepTime() {
- return sleepTime;
+ public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
+ return shouldRetry(attemptCount, request)? sleepTime: ZERO_MILLIS;
}
public int getMaxAttempts() {
@@ -150,8 +139,8 @@ public interface RetryPolicies {
}
@Override
- public boolean shouldRetry(int attemptCount) {
- return attemptCount <= maxAttempts;
+ public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
+ return attemptCount < maxAttempts;
}
@Override
@@ -163,4 +152,60 @@ public interface RetryPolicies {
return myString;
}
}
+
+ /**
+ * A {@link RaftClientRequest.Type} dependent {@link RetryPolicy}
+ * such that each type can be set to use an individual policy.
+ * When the policy is not set for a particular type,
+ * the {@link #retryForeverNoSleep()} policy is used as the default.
+ */
+ class RequestTypeDependentRetry implements RetryPolicy {
+ public static class Builder {
+ private final EnumMap<RaftClientRequestProto.TypeCase, RetryPolicy> map
+ = new EnumMap<>(RaftClientRequestProto.TypeCase.class);
+
+ /** Set the given policy for the given type. */
+ public Builder set(RaftClientRequestProto.TypeCase type, RetryPolicy
policy) {
+ final RetryPolicy previous = map.put(type, policy);
+ Preconditions.assertNull(previous, () -> "The type " + type + " is
already set to " + previous);
+ return this;
+ }
+
+ public RequestTypeDependentRetry build() {
+ return new RequestTypeDependentRetry(map);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private final Map<RaftClientRequestProto.TypeCase, RetryPolicy> map;
+
+ private RequestTypeDependentRetry(EnumMap<RaftClientRequestProto.TypeCase,
RetryPolicy> map) {
+ this.map = Collections.unmodifiableMap(map);
+ }
+
+ @Override
+ public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
+ return Optional.ofNullable(map.get(request.getType().getTypeCase()))
+ .orElse(retryForeverNoSleep())
+ .shouldRetry(attemptCount, request);
+ }
+
+ @Override
+ public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
+ return Optional.ofNullable(map.get(request.getType().getTypeCase()))
+ .orElse(retryForeverNoSleep())
+ .getSleepTime(attemptCount, request);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder b = new
StringBuilder(getClass().getSimpleName()).append("{");
+ map.forEach((key, value) ->
b.append(key).append("->").append(value).append(", "));
+ b.setLength(b.length() - 2);
+ return b.append("}").toString();
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index ba90435..162f760 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.retry;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
@@ -28,18 +29,19 @@ public interface RetryPolicy {
TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
/**
- * Determines whether it is supposed to retry the connection if the operation
- * fails for some reason.
+ * Determines whether it is supposed to retry after the operation has failed.
*
- * @param attemptCount The number of times attempted so far
- * @return true if it has to make another attempt, otherwise, false
+ * @param attemptCount The number of times attempted so far.
+ * @param request The failed request.
+ * @return true if it has to make another attempt; otherwise, return false.
*/
- boolean shouldRetry(int attemptCount);
+ boolean shouldRetry(int attemptCount, RaftClientRequest request);
/**
- * Returns the time duration for sleep in between the retries.
+ * @param attemptCount The number of times attempted so far.
+ * @return the {@link TimeDuration} to sleep in between the retries.
*/
- default TimeDuration getSleepTime() {
+ default TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
return ZERO_MILLIS;
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3821f5f..aa9893a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -116,7 +116,8 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
}
void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER
cluster) throws Exception {
- final RetryLimited retryPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, HUNDRED_MILLIS);
+ final TimeDuration sleepTime = HUNDRED_MILLIS;
+ final RetryLimited retryPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, sleepTime);
try(final RaftClient client = cluster.createClient(null, retryPolicy)) {
RaftPeerId leader = null;
@@ -147,7 +148,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
}
// sleep most of the retry time
- retryPolicy.getSleepTime().apply(t -> t *
(retryPolicy.getMaxAttempts() - 1)).sleep();
+ sleepTime.apply(t -> t * (retryPolicy.getMaxAttempts() - 1)).sleep();
// send another half of the calls without starting the cluster
for (; i < messages.length; i++) {
@@ -158,7 +159,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// sleep again so that the first half calls will fail retries.
// the second half still have retry time remaining.
- retryPolicy.getSleepTime().apply(t -> t*2).sleep();
+ sleepTime.apply(t -> t*2).sleep();
if (leader != null) {
cluster.restartServer(leader, false);
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
index ff947d4..4049f6b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,26 +17,82 @@
*/
package org.apache.ratis;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
-
-public class TestRetryPolicy {
+public class TestRetryPolicy extends BaseTest {
@Test
public void testRetryMultipleTimesWithFixedSleep() {
- RetryPolicy retryPolicy = RetryPolicies
- .retryUpToMaximumCountWithFixedSleep(2,
- TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS));
- boolean shouldRetry = retryPolicy.shouldRetry(1);
- Assert.assertTrue(shouldRetry);
- Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration());
- Assert.assertFalse(retryPolicy.shouldRetry(3));
+ final int n = 4;
+ final TimeDuration sleepTime = HUNDRED_MILLIS;
+ final RetryPolicy policy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, sleepTime);
+ final RaftClientRequest request =
newRaftClientRequest(RaftClientRequest.readRequestType());
+ for(int i = 1; i < 2*n; i++) {
+ final boolean expected = i < n;
+ Assert.assertEquals(expected, policy.shouldRetry(i, request));
+ if (expected) {
+ Assert.assertEquals(sleepTime, policy.getSleepTime(i, request));
+ } else {
+ Assert.assertEquals(0L, policy.getSleepTime(i, request).getDuration());
+ }
+ }
+ }
+
+ @Test
+ public void testRequestTypeDependentRetry() {
+ final RetryPolicies.RequestTypeDependentRetry.Builder b =
RetryPolicies.RequestTypeDependentRetry.newBuilder();
+ final int n = 4;
+ final TimeDuration writeSleep = HUNDRED_MILLIS;
+ final RetryPolicies.RetryLimited writePolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, writeSleep);
+ b.set(RaftClientRequestProto.TypeCase.WRITE, writePolicy);
+ b.set(RaftClientRequestProto.TypeCase.WATCH, RetryPolicies.noRetry());
+ final RetryPolicy policy = b.build();
+ LOG.info("policy = {}", policy);
+
+ final RaftClientRequest staleReadRequest =
newRaftClientRequest(RaftClientRequest.staleReadRequestType(1));
+ final RaftClientRequest readRequest =
newRaftClientRequest(RaftClientRequest.readRequestType());
+ final RaftClientRequest writeRequest =
newRaftClientRequest(RaftClientRequest.writeRequestType());
+ final RaftClientRequest watchRequest = newRaftClientRequest(
+ RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
+ for(int i = 1; i < 2*n; i++) {
+ { //write
+ final boolean expected = i < n;
+ Assert.assertEquals(expected, policy.shouldRetry(i, writeRequest));
+ if (expected) {
+ Assert.assertEquals(writeSleep, policy.getSleepTime(i,
writeRequest));
+ } else {
+ Assert.assertEquals(0L, policy.getSleepTime(i,
writeRequest).getDuration());
+ }
+ }
+
+ { //read and stale read are using default
+ Assert.assertTrue(policy.shouldRetry(i, readRequest));
+ Assert.assertEquals(0L, policy.getSleepTime(i,
readRequest).getDuration());
+
+ Assert.assertTrue(policy.shouldRetry(i, staleReadRequest));
+ Assert.assertEquals(0L, policy.getSleepTime(i,
staleReadRequest).getDuration());
+ }
+
+ { //watch has no retry
+ Assert.assertFalse(policy.shouldRetry(i, watchRequest));
+ Assert.assertEquals(0L, policy.getSleepTime(i,
watchRequest).getDuration());
+ }
+ }
+
+ }
+
+ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type
type) {
+ return new RaftClientRequest(ClientId.randomId(),
RaftPeerId.valueOf("s0"), RaftGroupId.randomId(), 1L, type);
}
}
\ No newline at end of file