This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 231b35c RATIS-565. Move the ordered async client code to a new class.
Contributed by Tsz Wo Nicholas Sze.
231b35c is described below
commit 231b35c23d44a0afb889f0c6bb5ba9194b3cf398
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed May 29 14:27:47 2019 +0530
RATIS-565. Move the ordered async client code to a new class. Contributed
by Tsz Wo Nicholas Sze.
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 216 +++++++++++++++++++++
.../apache/ratis/client/impl/RaftClientImpl.java | 204 ++++---------------
.../ratis/client/impl/RaftClientTestUtil.java | 5 +-
.../apache/ratis/client/impl/UnorderedAsync.java | 10 +-
4 files changed, 259 insertions(+), 176 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
new file mode 100644
index 0000000..4b5991c
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.function.FunctionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/** Send ordered asynchronous requests to a raft service. */
+class OrderedAsync {
+ static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+
+ static class PendingOrderedRequest extends PendingClientRequest
+ implements SlidingWindow.ClientSideRequest<RaftClientReply> {
+ private final Function<SlidingWindowEntry, RaftClientRequest>
requestConstructor;
+ private final long seqNum;
+ private volatile boolean isFirst = false;
+
+ PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry,
RaftClientRequest> requestConstructor) {
+ this.seqNum = seqNum;
+ this.requestConstructor = requestConstructor;
+ }
+
+ @Override
+ RaftClientRequest newRequestImpl() {
+ return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum,
isFirst));
+ }
+
+ @Override
+ public void setFirstRequest() {
+ isFirst = true;
+ }
+
+ @Override
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ @Override
+ public boolean hasReply() {
+ return getReplyFuture().isDone();
+ }
+
+ @Override
+ public void setReply(RaftClientReply reply) {
+ getReplyFuture().complete(reply);
+ }
+
+ @Override
+ public void fail(Throwable e) {
+ getReplyFuture().completeExceptionally(e);
+ }
+
+ @Override
+ public String toString() {
+ return "[seq=" + getSeqNum() + "]";
+ }
+ }
+
+ private final RaftClientImpl client;
+ /** Map: id -> {@link SlidingWindow}, in order to support async calls to the
Raft service or individual servers. */
+ private final ConcurrentMap<String,
SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>> slidingWindows
+ = new ConcurrentHashMap<>();
+ private final Semaphore requestSemaphore;
+
+ OrderedAsync(RaftClientImpl client, RaftProperties properties) {
+ this.client = Objects.requireNonNull(client, "client == null");
+ this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
+ }
+
+ private void resetSlidingWindow(RaftClientRequest request) {
+ getSlidingWindow(request).resetFirstSeqNum();
+ }
+
+ private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>
getSlidingWindow(RaftClientRequest request) {
+ return getSlidingWindow(request.is(TypeCase.STALEREAD) ?
request.getServerId() : null);
+ }
+
+ private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>
getSlidingWindow(RaftPeerId target) {
+ final String id = target != null ? target.toString() : "RAFT";
+ return slidingWindows.computeIfAbsent(id, key -> new
SlidingWindow.Client<>(client.getId() + "->" + key));
+ }
+
+ private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
+
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
+ }
+
+ private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount, Throwable throwable) {
+ failAllAsyncRequests(request, client.noMoreRetries(request, attemptCount,
throwable));
+ }
+
+ CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message
message, RaftPeerId server) {
+ if (!type.is(TypeCase.WATCH)) {
+ Objects.requireNonNull(message, "message == null");
+ }
+ try {
+ requestSemaphore.acquire();
+ } catch (InterruptedException e) {
+ return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
+ "Interrupted when sending " + type + ", message=" + message, e));
+ }
+
+ final long callId = RaftClientImpl.nextCallId();
+ final LongFunction<PendingOrderedRequest> constructor = seqNum -> new
PendingOrderedRequest(seqNum,
+ slidingWindowEntry -> client.newRaftClientRequest(server, callId,
message, type, slidingWindowEntry));
+ return getSlidingWindow(server).submitNewRequest(constructor,
this::sendRequestWithRetry
+ ).getReplyFuture(
+ ).thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new)
+ ).whenComplete((r, e) -> requestSemaphore.release());
+ }
+
+ private void sendRequestWithRetry(PendingOrderedRequest pending) {
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+ if (f.isDone()) {
+ return;
+ }
+
+ final RaftClientRequest request = pending.newRequest();
+ sendRequest(request, pending.getAttemptCount()).thenAccept(reply -> {
+ if (f.isDone()) {
+ return;
+ }
+ if (reply == null) {
+ final int attempt = pending.getAttemptCount();
+ LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
+ client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt,
request),
+ () -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetry),
+ LOG, () -> "Failed* to retry " + request);
+ } else {
+ f.complete(reply);
+ }
+
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
+ }
+
+ private CompletableFuture<RaftClientReply> sendRequest(RaftClientRequest
request, int attemptCount) {
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ LOG.debug("{}: send* {}", client.getId(), request);
+ return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+ LOG.debug("{}: receive* {}", client.getId(), reply);
+ final RaftException replyException = reply != null?
reply.getException(): null;
+ reply = client.handleNotLeaderException(request, reply,
this::resetSlidingWindow);
+ if (reply != null) {
+ getSlidingWindow(request).receiveReply(
+ request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetry);
+ } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
+ handleAsyncRetryFailure(request, attemptCount, replyException);
+ }
+ return reply;
+ }).exceptionally(e -> {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(client.getId() + ": Failed* " + request, e);
+ } else {
+ LOG.debug("{}: Failed* {} with {}", client.getId(), request, e);
+ }
+ e = JavaUtils.unwrapCompletionException(e);
+ if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
+ if (!retryPolicy.shouldRetry(attemptCount, request)) {
+ handleAsyncRetryFailure(request, attemptCount, e);
+ } else {
+ client.handleIOException(request, (IOException) e, null,
this::resetSlidingWindow);
+ }
+ return null;
+ }
+ failAllAsyncRequests(request, e);
+ return null;
+ });
+ }
+
+ void assertRequestSemaphore(int expectedAvailablePermits, int
expectedQueueLength) {
+ Preconditions.assertTrue(requestSemaphore.availablePermits() ==
expectedAvailablePermits);
+ Preconditions.assertTrue(requestSemaphore.getQueueLength() ==
expectedQueueLength);
+ }
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 024f6c7..a31da14 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,27 +21,31 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.*;
import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.util.*;
-import org.apache.ratis.util.function.FunctionUtils;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeoutScheduler;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
-import static
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
-import static
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.WATCH;
-
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
@@ -70,53 +74,6 @@ final class RaftClientImpl implements RaftClient {
}
}
- static class PendingAsyncRequest extends PendingClientRequest
- implements SlidingWindow.ClientSideRequest<RaftClientReply> {
- private final Function<SlidingWindowEntry, RaftClientRequest>
requestConstructor;
- private final long seqNum;
- private volatile boolean isFirst = false;
-
- PendingAsyncRequest(long seqNum, Function<SlidingWindowEntry,
RaftClientRequest> requestConstructor) {
- this.seqNum = seqNum;
- this.requestConstructor = requestConstructor;
- }
-
- @Override
- RaftClientRequest newRequestImpl() {
- return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum,
isFirst));
- }
-
- @Override
- public void setFirstRequest() {
- isFirst = true;
- }
-
- @Override
- public long getSeqNum() {
- return seqNum;
- }
-
- @Override
- public boolean hasReply() {
- return getReplyFuture().isDone();
- }
-
- @Override
- public void setReply(RaftClientReply reply) {
- getReplyFuture().complete(reply);
- }
-
- @Override
- public void fail(Throwable e) {
- getReplyFuture().completeExceptionally(e);
- }
-
- @Override
- public String toString() {
- return "[seq=" + getSeqNum() + "]";
- }
- }
-
private final ClientId clientId;
private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
@@ -125,11 +82,9 @@ final class RaftClientImpl implements RaftClient {
private volatile RaftPeerId leaderId;
- /** Map: id -> {@link SlidingWindow}, in order to support async calls to the
RAFT service or individual servers. */
- private final ConcurrentMap<String,
SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>>
- slidingWindows = new ConcurrentHashMap<>();
private final TimeoutScheduler scheduler;
- private final Semaphore asyncRequestSemaphore;
+
+ private final Supplier<OrderedAsync> orderedAsync;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy
retryPolicy) {
@@ -142,9 +97,10 @@ final class RaftClientImpl implements RaftClient {
Preconditions.assertTrue(retryPolicy != null, "retry policy can't be
null");
this.retryPolicy = retryPolicy;
- asyncRequestSemaphore = new
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler =
TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
clientRpc.addServers(peers);
+
+ this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this,
properties));
}
@Override
@@ -160,13 +116,8 @@ final class RaftClientImpl implements RaftClient {
return scheduler;
}
- private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>
getSlidingWindow(RaftClientRequest request) {
- return getSlidingWindow(request.is(STALEREAD)? request.getServerId():
null);
- }
-
- private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>
getSlidingWindow(RaftPeerId target) {
- final String id = target != null? target.toString(): "RAFT";
- return slidingWindows.computeIfAbsent(id, key -> new
SlidingWindow.Client<>(getId() + "->" + key));
+ OrderedAsync getOrderedAsync() {
+ return orderedAsync.get();
}
@Override
@@ -191,23 +142,7 @@ final class RaftClientImpl implements RaftClient {
private CompletableFuture<RaftClientReply> sendAsync(
RaftClientRequest.Type type, Message message, RaftPeerId server) {
- if (!type.is(WATCH)) {
- Objects.requireNonNull(message, "message == null");
- }
- try {
- asyncRequestSemaphore.acquire();
- } catch (InterruptedException e) {
- return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
- "Interrupted when sending " + type + ", message=" + message, e));
- }
-
- final long callId = nextCallId();
- final LongFunction<PendingAsyncRequest> constructor = seqNum -> new
PendingAsyncRequest(seqNum,
- slidingWindowEntry -> newRaftClientRequest(server, callId, message,
type, slidingWindowEntry));
- return getSlidingWindow(server).submitNewRequest(constructor,
this::sendRequestWithRetryAsync
- ).getReplyFuture(
- ).thenApply(reply -> handleRaftException(reply, CompletionException::new)
- ).whenComplete((r, e) -> asyncRequestSemaphore.release());
+ return getOrderedAsync().send(type, message, server);
}
RaftClientRequest newRaftClientRequest(
@@ -240,7 +175,7 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply send(RaftClientRequest.Type type, Message message,
RaftPeerId server)
throws IOException {
- if (!type.is(WATCH)) {
+ if (!type.is(TypeCase.WATCH)) {
Objects.requireNonNull(message, "message == null");
}
@@ -304,29 +239,6 @@ final class RaftClientImpl implements RaftClient {
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
}
- private void sendRequestWithRetryAsync(PendingAsyncRequest pending) {
- final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
- if (f.isDone()) {
- return;
- }
-
- final RaftClientRequest request = pending.newRequest();
- sendRequestAsync(request, pending.getAttemptCount()).thenAccept(reply -> {
- if (f.isDone()) {
- return;
- }
- if (reply == null) {
- final int attempt = pending.getAttemptCount();
- LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
- scheduler.onTimeout(retryPolicy.getSleepTime(attempt, request),
- () -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetryAsync),
- LOG, () -> "Failed* to retry " + request);
- } else {
- f.complete(reply);
- }
-
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
- }
-
private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest>
supplier) throws IOException {
for(int attemptCount = 1;; attemptCount++) {
final RaftClientRequest request = supplier.get();
@@ -342,7 +254,7 @@ final class RaftClientImpl implements RaftClient {
ioe = e;
}
if (!retryPolicy.shouldRetry(attemptCount, request)) {
- throw (IOException)noMoreRetries(request, attemptCount, retryPolicy,
ioe);
+ throw (IOException)noMoreRetries(request, attemptCount, ioe);
}
try {
@@ -353,53 +265,11 @@ final class RaftClientImpl implements RaftClient {
}
}
- private CompletableFuture<RaftClientReply> sendRequestAsync(
- RaftClientRequest request, int attemptCount) {
- LOG.debug("{}: send* {}", clientId, request);
- return clientRpc.sendRequestAsync(request).thenApply(reply -> {
- LOG.debug("{}: receive* {}", clientId, reply);
- final RaftException replyException = reply != null?
reply.getException(): null;
- reply = handleNotLeaderException(request, reply, true);
- if (reply != null) {
- getSlidingWindow(request).receiveReply(
- request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetryAsync);
- } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
- handleAsyncRetryFailure(request, attemptCount, replyException);
- }
- return reply;
- }).exceptionally(e -> {
- if (LOG.isTraceEnabled()) {
- LOG.trace(clientId + ": Failed* " + request, e);
- } else {
- LOG.debug("{}: Failed* {} with {}", clientId, request, e);
- }
- e = JavaUtils.unwrapCompletionException(e);
- if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- if (!retryPolicy.shouldRetry(attemptCount, request)) {
- handleAsyncRetryFailure(request, attemptCount, e);
- } else {
- handleIOException(request, (IOException) e, null, true);
- }
- return null;
- }
- failAllAsyncRequests(request, e);
- return null;
- });
- }
-
- static Throwable noMoreRetries(RaftClientRequest request, int attemptCount,
RetryPolicy policy, Throwable throwable) {
+ Throwable noMoreRetries(RaftClientRequest request, int attemptCount,
Throwable throwable) {
if (attemptCount == 1 && throwable != null) {
return throwable;
}
- return new RaftRetryFailureException(request, attemptCount, policy,
throwable);
- }
-
- private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount, Throwable throwable) {
- failAllAsyncRequests(request, noMoreRetries(request, attemptCount,
retryPolicy, throwable));
- }
-
- private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
-
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
+ return new RaftRetryFailureException(request, attemptCount, retryPolicy,
throwable);
}
private RaftClientReply sendRequest(RaftClientRequest request) throws
IOException {
@@ -410,11 +280,11 @@ final class RaftClientImpl implements RaftClient {
} catch (GroupMismatchException gme) {
throw gme;
} catch (IOException ioe) {
- handleIOException(request, ioe, null, false);
+ handleIOException(request, ioe);
throw ioe;
}
LOG.debug("{}: receive {}", clientId, reply);
- reply = handleNotLeaderException(request, reply, false);
+ reply = handleNotLeaderException(request, reply, null);
reply = handleRaftException(reply, Function.identity());
return reply;
}
@@ -435,7 +305,7 @@ final class RaftClientImpl implements RaftClient {
* otherwise return the same reply.
*/
RaftClientReply handleNotLeaderException(RaftClientRequest request,
RaftClientReply reply,
- boolean resetSlidingWindow) {
+ Consumer<RaftClientRequest> handler) {
if (reply == null) {
return null;
}
@@ -443,15 +313,15 @@ final class RaftClientImpl implements RaftClient {
if (nle == null) {
return reply;
}
- return handleNotLeaderException(request, nle, resetSlidingWindow);
+ return handleNotLeaderException(request, nle, handler);
}
RaftClientReply handleNotLeaderException(RaftClientRequest request,
NotLeaderException nle,
- boolean resetSlidingWindow) {
+ Consumer<RaftClientRequest> handler) {
refreshPeers(nle.getPeers());
final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
: nle.getSuggestedLeader().getId();
- handleIOException(request, nle, newLeader, resetSlidingWindow);
+ handleIOException(request, nle, newLeader, handler);
return null;
}
@@ -464,17 +334,20 @@ final class RaftClientImpl implements RaftClient {
}
}
+ void handleIOException(RaftClientRequest request, IOException ioe) {
+ handleIOException(request, ioe, null, null);
+ }
+
void handleIOException(RaftClientRequest request, IOException ioe,
- RaftPeerId newLeader, boolean resetSlidingWindow) {
+ RaftPeerId newLeader, Consumer<RaftClientRequest> handler) {
LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
- if (resetSlidingWindow) {
- getSlidingWindow(request).resetFirstSeqNum();
- }
+ Optional.ofNullable(handler).ifPresent(h -> h.accept(request));
+
if (ioe instanceof LeaderNotReadyException) {
return;
}
@@ -496,11 +369,6 @@ final class RaftClientImpl implements RaftClient {
clientRpc.handleException(oldLeader, ioe, changeLeader);
}
- void assertAsyncRequestSemaphore(int expectedAvailablePermits, int
expectedQueueLength) {
- Preconditions.assertTrue(asyncRequestSemaphore.availablePermits() ==
expectedAvailablePermits);
- Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() ==
expectedQueueLength);
- }
-
void assertScheduler(int numThreads) {
Preconditions.assertTrue(scheduler.getNumThreads() == numThreads);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 7426d32..62b7d32 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -25,9 +25,8 @@ import org.apache.ratis.protocol.RaftPeerId;
/** Interface for testing raft client. */
public interface RaftClientTestUtil {
- static void assertAsyncRequestSemaphore(
- RaftClient client, int expectedAvailablePermits, int
expectedQueueLength) {
- ((RaftClientImpl)
client).assertAsyncRequestSemaphore(expectedAvailablePermits,
expectedQueueLength);
+ static void assertAsyncRequestSemaphore(RaftClient client, int
expectedAvailablePermits, int expectedQueueLength) {
+ ((RaftClientImpl)
client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits,
expectedQueueLength);
}
static void assertScheduler(RaftClient client, int numThreads){
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 7637e8d..d248298 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -75,15 +75,15 @@ public interface UnorderedAsync {
try {
LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount,
reply);
final RaftException replyException = reply != null?
reply.getException(): null;
- reply = client.handleNotLeaderException(request, reply, false);
+ reply = client.handleNotLeaderException(request, reply, null);
if (reply != null) {
f.complete(reply);
return;
}
final RetryPolicy retryPolicy = client.getRetryPolicy();
if (!retryPolicy.shouldRetry(attemptCount, request)) {
- f.completeExceptionally(RaftClientImpl.noMoreRetries(
- request, attemptCount, retryPolicy, replyException != null?
replyException: e));
+ f.completeExceptionally(
+ client.noMoreRetries(request, attemptCount, replyException !=
null? replyException: e));
return;
}
@@ -97,12 +97,12 @@ public interface UnorderedAsync {
if (e instanceof IOException) {
if (e instanceof NotLeaderException) {
- client.handleNotLeaderException(request, (NotLeaderException) e,
false);
+ client.handleNotLeaderException(request, (NotLeaderException) e,
null);
} else if (e instanceof GroupMismatchException) {
f.completeExceptionally(e);
return;
} else {
- client.handleIOException(request, (IOException) e, null, false);
+ client.handleIOException(request, (IOException) e);
}
} else {
if (!client.getClientRpc().handleException(request.getServerId(),
e, false)) {