This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9c1638d RATIS-704. Invoke sendAsync as soon as OrderedAsync is
created. Contributed by Tsz Wo Nicholas Sze.
9c1638d is described below
commit 9c1638dbe6ef31246d780331b0cc8d27a8dc0c60
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Oct 24 12:48:08 2019 +0530
RATIS-704. Invoke sendAsync as soon as OrderedAsync is created. Contributed
by Tsz Wo Nicholas Sze.
---
.../apache/ratis/client/RaftClientConfigKeys.java | 15 ++++++++++++-
.../org/apache/ratis/client/impl/OrderedAsync.java | 26 ++++++++++++++++------
.../apache/ratis/client/impl/RaftClientImpl.java | 2 +-
.../apache/ratis/protocol/RaftClientRequest.java | 5 +++++
.../java/org/apache/ratis/util/PeerProxyMap.java | 1 +
.../apache/ratis/grpc/client/GrpcClientRpc.java | 2 +-
.../org/apache/ratis/RaftAsyncExceptionTests.java | 20 ++++++++++-------
.../test/java/org/apache/ratis/RaftAsyncTests.java | 14 +++++++++---
.../test/java/org/apache/ratis/RaftBasicTests.java | 5 +++--
9 files changed, 67 insertions(+), 23 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index bb01249..d2c3679 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -62,6 +62,19 @@ public interface RaftClientConfigKeys {
static void setMaxOutstandingRequests(RaftProperties properties, int
outstandingRequests) {
setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY,
outstandingRequests);
}
+
+ interface Experimental {
+ String PREFIX = Async.PREFIX + "." +
Experimental.class.getSimpleName().toLowerCase();
+
+ String SEND_DUMMY_REQUEST_KEY = PREFIX + ".send-dummy-request";
+ boolean SEND_DUMMY_REQUEST_DEFAULT = true;
+ static boolean sendDummyRequest(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, SEND_DUMMY_REQUEST_KEY,
SEND_DUMMY_REQUEST_DEFAULT, getDefaultLog());
+ }
+ static void setSendDummyRequest(RaftProperties properties, boolean
sendDummyRequest) {
+ setBoolean(properties::setBoolean, SEND_DUMMY_REQUEST_KEY,
sendDummyRequest);
+ }
+ }
}
static void main(String[] args) {
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 7694450..79ee050 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -53,8 +53,8 @@ import java.util.function.Function;
import java.util.function.LongFunction;
/** Send ordered asynchronous requests to a raft service. */
-class OrderedAsync {
- private static final Logger LOG =
LoggerFactory.getLogger(OrderedAsync.class);
+public final class OrderedAsync {
+ public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
static class PendingOrderedRequest extends PendingClientRequest
implements SlidingWindow.ClientSideRequest<RaftClientReply> {
@@ -110,13 +110,23 @@ class OrderedAsync {
}
}
+ static OrderedAsync newInstance(RaftClientImpl client, RaftProperties
properties) {
+ final OrderedAsync ordered = new OrderedAsync(client, properties);
+ // send a dummy watch request to establish the connection
+ // TODO: this is a work around, it is better to fix the underlying RPC
implementation
+ if (RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties)) {
+ ordered.send(RaftClientRequest.watchRequestType(), null, null);
+ }
+ return ordered;
+ }
+
private final RaftClientImpl client;
/** Map: id -> {@link SlidingWindow}, in order to support async calls to the
Raft service or individual servers. */
private final ConcurrentMap<String,
SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>> slidingWindows
= new ConcurrentHashMap<>();
private final Semaphore requestSemaphore;
- OrderedAsync(RaftClientImpl client, RaftProperties properties) {
+ private OrderedAsync(RaftClientImpl client, RaftProperties properties) {
this.client = Objects.requireNonNull(client, "client == null");
this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
}
@@ -170,6 +180,7 @@ class OrderedAsync {
final RaftClientRequest request = pending.newRequestImpl();
if (request == null) { // already done
+ LOG.debug("{} newRequestImpl returns null", pending);
return;
}
@@ -198,14 +209,15 @@ class OrderedAsync {
private void scheduleWithTimeout(PendingOrderedRequest pending,
RaftClientRequest request, RetryPolicy retryPolicy) {
final int attempt = pending.getAttemptCount();
- LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
- scheduleWithTimeout(pending, request.getServerId(), sleepTime);
+ LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}",
attempt, sleepTime, retryPolicy, request);
+ scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
}
- private void scheduleWithTimeout(PendingOrderedRequest pending, RaftPeerId
serverId, TimeDuration sleepTime) {
+ private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration
sleepTime,
+ SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>
slidingWindow) {
client.getScheduler().onTimeout(sleepTime,
- () -> getSlidingWindow(serverId).retry(pending,
this::sendRequestWithRetry),
+ () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
LOG, () -> "Failed* to retry " + pending);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 582b6b6..e1df86b 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -99,7 +99,7 @@ final class RaftClientImpl implements RaftClient {
scheduler = TimeoutScheduler.getInstance();
clientRpc.addServers(peers);
- this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this,
properties));
+ this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this,
properties));
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 4c10c0c..a9e5064 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -30,6 +30,8 @@ import static
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
*/
public class RaftClientRequest extends RaftClientMessage {
private static final Type WRITE_DEFAULT = new
Type(WriteRequestTypeProto.getDefaultInstance());
+ private static final Type WATCH_DEFAULT = new Type(
+
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
private static final Type DEFAULT_READ = new
Type(ReadRequestTypeProto.getDefaultInstance());
private static final Type DEFAULT_STALE_READ = new
Type(StaleReadRequestTypeProto.getDefaultInstance());
@@ -47,6 +49,9 @@ public class RaftClientRequest extends RaftClientMessage {
: new
Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
}
+ public static Type watchRequestType() {
+ return WATCH_DEFAULT;
+ }
public static Type watchRequestType(long index, ReplicationLevel
replication) {
return new
Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build());
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index ea0c2f1..6327adb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -98,6 +98,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements
Closeable {
}
public PROXY getProxy(RaftPeerId id) throws IOException {
+ Objects.requireNonNull(id, "id == null");
PeerAndProxy p = peers.get(id);
if (p == null) {
synchronized (resetLock) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 5883598..1fceb1b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -66,7 +66,7 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
// Reuse the same grpc stream for all async calls.
return proxy.getOrderedStreamObservers().onNext(request);
- } catch (IOException e) {
+ } catch (Throwable e) {
return JavaUtils.completeExceptionally(e);
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index cdb0e6b..4e8f68e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -17,9 +17,11 @@
*/
package org.apache.ratis;
-import org.apache.ratis.client.RaftClient;
+import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.OrderedAsync;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -27,7 +29,7 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.LogUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -36,20 +38,22 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
+ LogUtils.setLogLevel(OrderedAsync.LOG, Level.DEBUG);
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
}
@Test
public void testGroupMismatchException() throws Exception {
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
false);
runWithNewCluster(1, this::runTestGroupMismatchException);
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
true);
}
private void runTestGroupMismatchException(CLUSTER cluster) throws Exception
{
@@ -95,24 +99,24 @@ public abstract class RaftAsyncExceptionTests<CLUSTER
extends MiniRaftCluster>
private void runTestTimeoutException(CLUSTER cluster) throws Exception {
// send a message to make sure the cluster is working
try(RaftClient client = cluster.createClient()) {
- client.send(new SimpleMessage("m0"));
+ final RaftClientReply reply = client.send(new SimpleMessage("m0"));
+ Assert.assertTrue(reply.isSuccess());
- RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(),
- TimeDuration.valueOf(3, TimeUnit.SECONDS));
+ RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(), ONE_SECOND);
// Block StartTransaction
cluster.getServers().stream()
.map(cluster::getRaftServerImpl)
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockStartTransaction);
final CompletableFuture<RaftClientReply> replyFuture =
client.sendAsync(new SimpleMessage("m1"));
- Thread.sleep(10000);
+ FIVE_SECONDS.sleep();
// Unblock StartTransaction
cluster.getServers().stream()
.map(cluster::getRaftServerImpl)
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockStartTransaction);
// The request should succeed after start transaction is unblocked
- Assert.assertTrue(replyFuture.get().isSuccess());
+ Assert.assertTrue(replyFuture.get(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit()).isSuccess());
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 4ab5282..8760147 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -105,12 +105,18 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
@Test
public void testRequestAsyncWithRetryFailure() throws Exception {
- runWithNewCluster(1, false, cluster ->
runTestRequestAsyncWithRetryFailure(false, cluster));
+ runTestRequestAsyncWithRetryFailure(false);
}
@Test
public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws
Exception {
- runWithNewCluster(1, true, cluster ->
runTestRequestAsyncWithRetryFailure(true, cluster));
+ runTestRequestAsyncWithRetryFailure(true);
+ }
+
+ void runTestRequestAsyncWithRetryFailure(boolean initialMessages) throws
Exception {
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
false);
+ runWithNewCluster(1, initialMessages, cluster ->
runTestRequestAsyncWithRetryFailure(initialMessages, cluster));
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
true);
}
void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER
cluster) throws Exception {
@@ -345,11 +351,13 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime =
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
- RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
TimeDuration.valueOf(5, TimeUnit.SECONDS));
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
FIVE_SECONDS);
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
false);
runWithNewCluster(NUM_SERVERS, cluster ->
RaftBasicTests.testRequestTimeout(true, cluster, LOG));
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
oldExpiryTime);
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
true);
}
@Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index a3cab64..f07eb7c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -37,6 +37,7 @@ import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -407,7 +408,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
public static void testRequestTimeout(boolean async, MiniRaftCluster
cluster, Logger LOG) throws Exception {
waitForLeader(cluster);
- long time = System.currentTimeMillis();
+ final Timestamp startTime = Timestamp.currentTime();
try (final RaftClient client = cluster.createClient()) {
// Get the next callId to be used by the client
long callId = RaftClientTestUtil.getCallId(client);
@@ -428,7 +429,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
// Eventually the request would be accepted by the server
// when the retry cache entry is invalidated.
// The duration for which the client waits should be more than the
retryCacheExpiryDuration.
- TimeDuration duration = TimeDuration.valueOf(System.currentTimeMillis()
- time, TimeUnit.MILLISECONDS);
+ final TimeDuration duration = startTime.elapsedTime();
TimeDuration retryCacheExpiryDuration =
RaftServerConfigKeys.RetryCache.expiryTime(cluster.getProperties());
Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
}