Repository: incubator-ratis Updated Branches: refs/heads/master 59de6bd6b -> a82cd7b12
RATIS-234. Add an feature to watch if a request is replicated/committed to a particular ReplicationLevel. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a82cd7b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a82cd7b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a82cd7b1 Branch: refs/heads/master Commit: a82cd7b126f432cb2851e61155042c9f9ecfdb79 Parents: 59de6bd Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Wed Oct 10 16:29:44 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Wed Oct 10 16:29:44 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 6 + .../ratis/client/RaftClientConfigKeys.java | 3 + .../ratis/client/impl/ClientProtoUtils.java | 17 +- .../ratis/client/impl/RaftClientImpl.java | 27 ++- .../apache/ratis/protocol/RaftClientReply.java | 16 +- .../ratis/protocol/RaftClientRequest.java | 76 +++++--- .../ratis/protocol/ServerInformationReply.java | 2 +- .../org/apache/ratis/util/PeerProxyMap.java | 4 + .../ratis/grpc/TestWatchRequestWithGrpc.java | 25 +++ ratis-proto/src/main/proto/Raft.proto | 13 ++ .../apache/ratis/server/impl/LeaderState.java | 37 +++- .../apache/ratis/server/impl/LogAppender.java | 6 +- .../ratis/server/impl/PendingRequest.java | 6 +- .../ratis/server/impl/PendingRequests.java | 34 ++-- .../ratis/server/impl/RaftServerImpl.java | 25 ++- .../ratis/server/impl/ServerProtoUtils.java | 2 +- .../apache/ratis/server/impl/WatchRequests.java | 136 +++++++++++++ .../org/apache/ratis/WatchRequestTests.java | 193 +++++++++++++++++++ .../SimpleStateMachine4Testing.java | 15 +- 19 files changed, 571 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 5af28bb..0b8fdb0 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -66,6 +66,9 @@ public interface RaftClient extends Closeable { /** Async call to send the given stale-read message to the given server (not the raft service). */ CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server); + /** Async call to watch the given index to satisfy the given replication level. */ + CompletableFuture<RaftClientReply> sendWatchAsync(long index, ReplicationLevel replication); + /** * Send the given message to the raft service. * The message may change the state of the service. @@ -88,6 +91,9 @@ public interface RaftClient extends Closeable { /** Send the given stale-read message to the given server (not the raft service). */ RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server) throws IOException; + /** Watch the given index to satisfy the given replication level. */ + RaftClientReply sendWatch(long index, ReplicationLevel replication) throws IOException; + /** Send set configuration request to the raft service. */ RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index b54f15c..10fc69d 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -45,6 +45,9 @@ public interface RaftClientConfigKeys { return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog()); } + static void setRetryInterval(RaftProperties properties, TimeDuration timeoutDuration) { + setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY, timeoutDuration); + } String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout"; TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index f2a1e90..a9e509a 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -76,6 +76,8 @@ public interface ClientProtoUtils { return RaftClientRequest.Type.valueOf(p.getRead()); case STALEREAD: return RaftClientRequest.Type.valueOf(p.getStaleRead()); + case WATCH: + return RaftClientRequest.Type.valueOf(p.getWatch()); default: throw new IllegalArgumentException("Unexpected request type: " + p.getTypeCase() + " in request proto " + p); @@ -98,8 +100,10 @@ public interface ClientProtoUtils { static RaftClientRequestProto toRaftClientRequestProto( RaftClientRequest request) { final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) - .setMessage(toClientMessageEntryProtoBuilder(request.getMessage())); + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); + if (request.getMessage() != null) { + b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage())); + } final RaftClientRequest.Type type = request.getType(); switch (type.getTypeCase()) { @@ -112,6 +116,9 @@ public interface ClientProtoUtils { case STALEREAD: b.setStaleRead(type.getStaleRead()); break; + case WATCH: + b.setWatch(type.getWatch()); + break; default: throw new IllegalArgumentException("Unexpected request type: " + request.getType() + " in request " + request); @@ -131,13 +138,13 @@ public interface ClientProtoUtils { .build(); } - static RaftClientReplyProto toRaftClientReplyProto( - RaftClientReply reply) { + static RaftClientReplyProto toRaftClientReplyProto(RaftClientReply reply) { final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder(); if (reply != null) { b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(), reply.getServerId().toByteString(), reply.getRaftGroupId(), reply.getCallId(), reply.isSuccess())); + b.setLogIndex(reply.getLogIndex()); if (reply.getMessage() != null) { b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage())); } @@ -223,7 +230,7 @@ public interface ClientProtoUtils { return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), groupId, rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e, - replyProto.getCommitInfosList()); + replyProto.getLogIndex(), replyProto.getCommitInfosList()); } static ServerInformationReply toServerInformationReply( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index aa4e9c8..93c5caa 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -40,6 +40,7 @@ import java.util.function.Supplier; import java.util.stream.Stream; import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD; +import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.WATCH; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { @@ -149,14 +150,21 @@ final class RaftClientImpl implements RaftClient { return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), message, server); } + @Override + public CompletableFuture<RaftClientReply> sendWatchAsync(long index, ReplicationLevel replication) { + return sendAsync(RaftClientRequest.watchRequestType(index, replication), null, null); + } + private CompletableFuture<RaftClientReply> sendAsync( RaftClientRequest.Type type, Message message, RaftPeerId server) { - Objects.requireNonNull(message, "message == null"); + if (!type.is(WATCH)) { + Objects.requireNonNull(message, "message == null"); + } try { asyncRequestSemaphore.acquire(); } catch (InterruptedException e) { throw new CompletionException(IOUtils.toInterruptedIOException( - "Interrupted when sending " + message, e)); + "Interrupted when sending " + type + ", message=" + message, e)); } final long callId = nextCallId(); final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum, @@ -189,9 +197,16 @@ final class RaftClientImpl implements RaftClient { return send(RaftClientRequest.staleReadRequestType(minIndex), message, server); } + @Override + public RaftClientReply sendWatch(long index, ReplicationLevel replication) throws IOException { + return send(RaftClientRequest.watchRequestType(index, replication), null, null); + } + private RaftClientReply send(RaftClientRequest.Type type, Message message, RaftPeerId server) throws IOException { - Objects.requireNonNull(message, "message == null"); + if (!type.is(WATCH)) { + Objects.requireNonNull(message, "message == null"); + } final long callId = nextCallId(); return sendRequestWithRetry(() -> newRaftClientRequest( @@ -292,7 +307,11 @@ final class RaftClientImpl implements RaftClient { } return reply; }).exceptionally(e -> { - LOG.debug("{}: Failed {} with {}", clientId, request, e); + if (LOG.isTraceEnabled()) { + LOG.trace(clientId + ": Failed " + request, e); + } else { + LOG.debug("{}: Failed {} with {}", clientId, request, e); + } e = JavaUtils.unwrapCompletionException(e); if (e instanceof GroupMismatchException) { throw new CompletionException(e); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 5175482..4c290ff 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -43,18 +43,20 @@ public class RaftClientReply extends RaftClientMessage { private final RaftException exception; private final Message message; + private final long logIndex; /** The commit information when the reply is created. */ private final Collection<CommitInfoProto> commitInfos; public RaftClientReply( ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, boolean success, Message message, RaftException exception, - Collection<CommitInfoProto> commitInfos) { + long logIndex, Collection<CommitInfoProto> commitInfos) { super(clientId, serverId, groupId); this.success = success; this.callId = callId; this.message = message; this.exception = exception; + this.logIndex = logIndex; this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList(); if (exception != null) { @@ -68,7 +70,7 @@ public class RaftClientReply extends RaftClientMessage { public RaftClientReply(RaftClientRequest request, RaftException exception, Collection<CommitInfoProto> commitInfos) { this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), - request.getCallId(), false, null, exception, commitInfos); + request.getCallId(), false, null, exception, 0L, commitInfos); } public RaftClientReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos) { @@ -77,12 +79,12 @@ public class RaftClientReply extends RaftClientMessage { public RaftClientReply(RaftClientRequest request, Message message, Collection<CommitInfoProto> commitInfos) { this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), - request.getCallId(), true, message, null, commitInfos); + request.getCallId(), true, message, null, 0L, commitInfos); } public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) { this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(), - reply.getCallId(), false, reply.getMessage(), nre, reply.getCommitInfos()); + reply.getCallId(), false, reply.getMessage(), nre, reply.getLogIndex(), reply.getCommitInfos()); } /** @@ -104,11 +106,15 @@ public class RaftClientReply extends RaftClientMessage { return callId; } + public long getLogIndex() { + return logIndex; + } + @Override public String toString() { return super.toString() + ", cid=" + getCallId() + ", " + (isSuccess()? "SUCCESS": "FAILED " + exception) - + ", commits" + ProtoUtils.toString(commitInfos); + + ", logIndex=" + getLogIndex() + ", commits" + ProtoUtils.toString(commitInfos); } public boolean isSuccess() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index a9fe740..48d203a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -53,7 +53,11 @@ public class RaftClientRequest extends RaftClientMessage { : new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build()); } - /** The type of a request (oneof write, read, staleRead; see the message RaftClientRequestProto). */ + public static Type watchRequestType(long index, ReplicationLevel replication) { + return new Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build()); + } + + /** The type of a request (oneof write, read, staleRead, watch; see the message RaftClientRequestProto). */ public static class Type { public static Type valueOf(WriteRequestTypeProto write) { return writeRequestType(write.getReplication()); @@ -68,35 +72,41 @@ public class RaftClientRequest extends RaftClientMessage { : new Type(staleRead); } + public static Type valueOf(WatchRequestTypeProto watch) { + return watchRequestType(watch.getIndex(), watch.getReplication()); + } + /** * The type case of the proto. * Only the corresponding proto (must be non-null) is used. * The other protos are ignored. */ private final RaftClientRequestProto.TypeCase typeCase; - private final WriteRequestTypeProto write; - private final ReadRequestTypeProto read; - private final StaleReadRequestTypeProto staleRead; + private final Object proto; + + private Type(RaftClientRequestProto.TypeCase typeCase, Object proto) { + this.typeCase = Objects.requireNonNull(typeCase, "typeCase == null"); + this.proto = Objects.requireNonNull(proto, "proto == null"); + } private Type(WriteRequestTypeProto write) { - this.typeCase = WRITE; - this.write = Objects.requireNonNull(write); - this.read = null; - this.staleRead = null; + this(WRITE, write); } private Type(ReadRequestTypeProto read) { - this.typeCase = READ; - this.write = null; - this.read = Objects.requireNonNull(read); - this.staleRead = null; + this(READ, read); } private Type(StaleReadRequestTypeProto staleRead) { - this.typeCase = STALEREAD; - this.write = null; - this.read = null; - this.staleRead = Objects.requireNonNull(staleRead); + this(STALEREAD, staleRead); + } + + private Type(WatchRequestTypeProto watch) { + this(WATCH, watch); + } + + public boolean is(RaftClientRequestProto.TypeCase typeCase) { + return getTypeCase().equals(typeCase); } public RaftClientRequestProto.TypeCase getTypeCase() { @@ -104,30 +114,44 @@ public class RaftClientRequest extends RaftClientMessage { } public WriteRequestTypeProto getWrite() { - Preconditions.assertTrue(typeCase == WRITE); - return write; + Preconditions.assertTrue(is(WRITE)); + return (WriteRequestTypeProto)proto; } public ReadRequestTypeProto getRead() { - Preconditions.assertTrue(typeCase == READ); - return read; + Preconditions.assertTrue(is(READ)); + return (ReadRequestTypeProto)proto; } public StaleReadRequestTypeProto getStaleRead() { - Preconditions.assertTrue(typeCase == STALEREAD); - return staleRead; + Preconditions.assertTrue(is(STALEREAD)); + return (StaleReadRequestTypeProto)proto; + } + + public WatchRequestTypeProto getWatch() { + Preconditions.assertTrue(is(WATCH)); + return (WatchRequestTypeProto)proto; + } + + static String toString(ReplicationLevel replication) { + return replication == ReplicationLevel.MAJORITY? "": "-" + replication; + } + + public static String toString(WatchRequestTypeProto w) { + return "Watch" + toString(w.getReplication()) + "(" + w.getIndex() + ")"; } @Override public String toString() { switch (typeCase) { case WRITE: - final ReplicationLevel replication = write.getReplication(); - return "RW" + (replication == ReplicationLevel.MAJORITY? "": "-" + replication); + return "RW" + toString(getWrite().getReplication()); case READ: return "RO"; case STALEREAD: - return "StaleRead(" + staleRead.getMinIndex() + ")"; + return "StaleRead(" + getStaleRead().getMinIndex() + ")"; + case WATCH: + return toString(getWatch()); default: throw new IllegalStateException("Unexpected request type: " + typeCase); } @@ -177,7 +201,7 @@ public class RaftClientRequest extends RaftClientMessage { } public boolean is(RaftClientRequestProto.TypeCase typeCase) { - return getType().getTypeCase() == typeCase; + return getType().is(typeCase); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java index 18bf1c0..45a94ea 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java @@ -43,7 +43,7 @@ public class ServerInformationReply extends RaftClientReply { ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, boolean success, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { - super(clientId, serverId, groupId, callId, success, null, null, commitInfos); + super(clientId, serverId, groupId, callId, success, null, null, 0L, commitInfos); this.roleInfoProto = roleInfoProto; this.isRaftStorageHealthy = isRaftStorageHealthy; this.group = group; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java index 53a2936..d3d80ef 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java @@ -51,6 +51,10 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { if (proxy == null) { synchronized (this) { if (proxy == null) { + final LifeCycle.State current = lifeCycle.getCurrentState(); + if (current.isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED)) { + throw new IOException(name + " is already " + current); + } lifeCycle.startAndTransition( () -> proxy = createProxy.apply(peer), IOException.class); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java new file mode 100644 index 0000000..7b9061b --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.WatchRequestTests; + +public class TestWatchRequestWithGrpc + extends WatchRequestTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index e0916fd..d9a4f08 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -174,8 +174,14 @@ message ClientMessageEntryProto { } enum ReplicationLevel { + /** Committed at the leader and replicated to the majority of peers. */ MAJORITY = 0; + /** Committed at the leader and replicated to all peers. + Note that ReplicationLevel.ALL implies ReplicationLevel.MAJORITY. */ ALL = 1; + /** Committed at all peers. + Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL. */ + ALL_COMMITTED = 2; } @@ -197,6 +203,11 @@ message StaleReadRequestTypeProto { uint64 minIndex = 1; } +message WatchRequestTypeProto { + uint64 index = 1; + ReplicationLevel replication = 2; +} + // normal client request message RaftClientRequestProto { RaftRpcRequestProto rpcRequest = 1; @@ -206,6 +217,7 @@ message RaftClientRequestProto { WriteRequestTypeProto write = 3; ReadRequestTypeProto read = 4; StaleReadRequestTypeProto staleRead = 5; + WatchRequestTypeProto watch = 6; } } @@ -236,6 +248,7 @@ message RaftClientReplyProto { StateMachineExceptionProto stateMachineException = 5; } + uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction repeated CommitInfoProto commitInfos = 15; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index b4b613e..48f0c1c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -173,6 +175,7 @@ public class LeaderState { private final EventQueue eventQueue = new EventQueue(); private final EventProcessor processor; private final PendingRequests pendingRequests; + private final WatchRequests watchRequests; private volatile boolean running = true; private final int stagingCatchupGap; @@ -189,7 +192,8 @@ public class LeaderState { this.raftLog = state.getLog(); this.currentTerm = state.getCurrentTerm(); processor = new EventProcessor(); - pendingRequests = new PendingRequests(server); + this.pendingRequests = new PendingRequests(server.getId()); + this.watchRequests = new WatchRequests(server); final RaftConfiguration conf = server.getRaftConf(); Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); @@ -229,8 +233,12 @@ public class LeaderState { this.running = false; // do not interrupt event processor since it may be in the middle of logSync senders.forEach(LogAppender::stopAppender); + final NotLeaderException nle = server.generateNotLeaderException(); + final Collection<CommitInfoProto> commitInfos = server.getCommitInfos(); try { - pendingRequests.sendNotLeaderResponses(); + final Collection<TransactionContext> transactions = pendingRequests.sendNotLeaderResponses(nle, commitInfos); + server.getStateMachine().notifyNotLeader(transactions); + watchRequests.failWatches(nle); } catch (IOException e) { LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e); } @@ -286,6 +294,24 @@ public class LeaderState { return pendingRequests.addPendingRequest(index, request, entry); } + CompletableFuture<Void> addWatchReqeust(RaftClientRequest request) { + LOG.debug("{}: addWatchRequest {}", server.getId(), request); + return watchRequests.add(request.getType().getWatch()); + } + + void commitIndexChanged() { + final long leader = raftLog.getLastCommittedIndex(); + final long min = senders.stream() + .map(LogAppender::getFollower) + .map(FollowerInfo::getCommitIndex) + .min(Long::compare) + .orElse(leader); // it happens only if senders.isEmpty() + Preconditions.assertTrue(leader >= min); // leader commit index should always be ahead followers + + watchRequests.update(ReplicationLevel.MAJORITY, leader); + watchRequests.update(ReplicationLevel.ALL_COMMITTED, min); + } + private void applyOldNewConf() { final ServerState state = server.getState(); final RaftConfiguration current = server.getRaftConf(); @@ -510,10 +536,13 @@ public class LeaderState { // the log gets purged after the statemachine does a snapshot final TermIndex[] entriesToCommit = raftLog.getEntries( oldLastCommitted + 1, majority + 1); - server.getState().updateStatemachine(majority, currentTerm); + if (server.getState().updateStatemachine(majority, currentTerm)) { + commitIndexChanged(); + } checkAndUpdateConfiguration(entriesToCommit); } + watchRequests.update(ReplicationLevel.ALL, min); pendingRequests.checkDelayedReplies(min); } @@ -533,7 +562,7 @@ public class LeaderState { if (conf.isTransitional()) { replicateNewConf(); } else { // the (new) log entry has been committed - pendingRequests.replySetConfiguration(); + pendingRequests.replySetConfiguration(server::getCommitInfos); // if the leader is not included in the current configuration, step down if (!conf.containsInConf(server.getId())) { LOG.info("{} is not included in the new configuration {}. Step down.", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 4dff3e5..f26a48c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -244,7 +244,7 @@ public class LogAppender { throw e; } catch (IOException ioe) { // TODO should have more detailed retry policy here. - if (retry % 10 == 1) { // to reduce the number of messages + if (retry++ % 10 == 0) { // to reduce the number of messages LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe); } handleException(ioe); @@ -257,7 +257,9 @@ public class LogAppender { } protected void updateCommitIndex(long commitIndex) { - follower.updateCommitIndex(commitIndex); + if (follower.updateCommitIndex(commitIndex)) { + server.commitIndexChanged(); + } } protected class SnapshotRequestIter http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index a95184a..35e3082 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -17,12 +17,14 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.*; import org.apache.ratis.server.impl.RetryCache.CacheEntry; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; +import java.util.Collection; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -111,8 +113,8 @@ public class PendingRequest implements Comparable<PendingRequest> { setReply(delayed.fail(e)); } - TransactionContext setNotLeaderException(NotLeaderException nle) { - setReply(new RaftClientReply(getRequest(), nle, null)); + TransactionContext setNotLeaderException(NotLeaderException nle, Collection<CommitInfoProto> commitInfos) { + setReply(new RaftClientReply(getRequest(), nle, commitInfos)); return getEntry(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index 9a4ed74..fce1cf1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.*; import org.apache.ratis.server.impl.RetryCache.CacheEntry; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; @@ -26,12 +27,12 @@ import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collection; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; class PendingRequests { @@ -63,11 +64,11 @@ class PendingRequests { return r; } - Collection<TransactionContext> setNotLeaderException(NotLeaderException nle) { + Collection<TransactionContext> setNotLeaderException(NotLeaderException nle, Collection<CommitInfoProto> commitInfos) { LOG.debug("{}: PendingRequests.setNotLeaderException", name); try { return map.values().stream() - .map(p -> p.setNotLeaderException(nle)) + .map(p -> p.setNotLeaderException(nle, commitInfos)) .collect(Collectors.toList()); } finally { map.clear(); @@ -131,16 +132,16 @@ class PendingRequests { } private PendingRequest pendingSetConf; - private final RaftServerImpl server; + private final String name; private final RequestMap pendingRequests; private PendingRequest last = null; private final DelayedReplies delayedReplies; - PendingRequests(RaftServerImpl server) { - this.server = server; - this.pendingRequests = new RequestMap(server.getId()); - this.delayedReplies = new DelayedReplies(server.getId()); + PendingRequests(RaftPeerId id) { + this.name = id + "-" + getClass().getSimpleName(); + this.pendingRequests = new RequestMap(id); + this.delayedReplies = new DelayedReplies(id); } PendingRequest addPendingRequest(long index, RaftClientRequest request, @@ -169,16 +170,16 @@ class PendingRequests { return pendingSetConf; } - void replySetConfiguration() { + void replySetConfiguration(Supplier<Collection<CommitInfoProto>> getCommitInfos) { // we allow the pendingRequest to be null in case that the new leader // commits the new configuration while it has not received the retry // request from the client if (pendingSetConf != null) { final RaftClientRequest request = pendingSetConf.getRequest(); - LOG.debug("{}: sends success for {}", server.getId(), request); + LOG.debug("{}: sends success for {}", name, request); // for setConfiguration we do not need to wait for statemachine. send back // reply after it's committed. - pendingSetConf.setReply(new RaftClientReply(request, server.getCommitInfos())); + pendingSetConf.setReply(new RaftClientReply(request, getCommitInfos.get())); pendingSetConf = null; } } @@ -217,16 +218,15 @@ class PendingRequests { * The leader state is stopped. Send NotLeaderException to all the pending * requests since they have not got applied to the state machine yet. */ - void sendNotLeaderResponses() throws IOException { - LOG.info("{}: sendNotLeaderResponses", server.getId()); + Collection<TransactionContext> sendNotLeaderResponses(NotLeaderException nle, Collection<CommitInfoProto> commitInfos) { + LOG.info("{}: sendNotLeaderResponses", name); - // notify the state machine about stepping down - final NotLeaderException nle = server.generateNotLeaderException(); - server.getStateMachine().notifyNotLeader(pendingRequests.setNotLeaderException(nle)); + final Collection<TransactionContext> transactions = pendingRequests.setNotLeaderException(nle, commitInfos); if (pendingSetConf != null) { - pendingSetConf.setNotLeaderException(nle); + pendingSetConf.setNotLeaderException(nle, commitInfos); } delayedReplies.failReplies(); + return transactions; } void checkDelayedReplies(long allAckedIndex) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2d7f85a..a1c1192 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -520,6 +520,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return processQueryFuture(stateMachine.query(request.getMessage()), request); } + if (request.is(RaftClientRequestProto.TypeCase.WATCH)) { + return watchAsync(request); + } + // query the retry cache RetryCache.CacheQueryResult previousResult = retryCache.queryCache( request.getClientId(), request.getCallId()); @@ -543,6 +547,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return appendTransaction(request, context, cacheEntry); } + private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) { + return role.getLeaderState() + .map(ls -> ls.addWatchReqeust(request).thenApply(v -> new RaftClientReply(request, getCommitInfos()))) + .orElseGet(() -> CompletableFuture.completedFuture( + new RaftClientReply(request, generateNotLeaderException(), getCommitInfos()))); + } + private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) { final long minIndex = request.getType().getStaleRead().getMinIndex(); final long commitIndex = state.getLog().getLastCommittedIndex(); @@ -999,6 +1010,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou groupId, term, lastEntry); } + void commitIndexChanged() { + role.getLeaderState().ifPresent(LeaderState::commitIndexChanged); + } + public void submitUpdateCommitEvent() { role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent); } @@ -1022,15 +1037,16 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey())); } + final long logIndex = logEntry.getIndex(); return stateMachineFuture.whenComplete((reply, exception) -> { final RaftClientReply r; if (exception == null) { - r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null, getCommitInfos()); + r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null, logIndex, getCommitInfos()); } else { // the exception is coming from the state machine. wrap it into the // reply as a StateMachineException final StateMachineException e = new StateMachineException(getId(), exception); - r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e, getCommitInfos()); + r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e, logIndex, getCommitInfos()); } // update pending request @@ -1040,7 +1056,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (isLeader() && leaderState != null) { // is leader and is running // For leader, update cache unless the reply is delayed. // When a reply is delayed, the cache will be updated in DelayedReply.getReply(). - updateCache = leaderState.replyPendingRequest(logEntry.getIndex(), r, cacheEntry); + updateCache = leaderState.replyPendingRequest(logIndex, r, cacheEntry); } } if (updateCache) { @@ -1091,7 +1107,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, logEntry.getCallId()); if (cacheEntry != null) { final RaftClientReply reply = new RaftClientReply(clientId, getId(), getGroupId(), - logEntry.getCallId(), false, null, generateNotLeaderException(), getCommitInfos()); + logEntry.getCallId(), false, null, generateNotLeaderException(), + logEntry.getIndex(), getCommitInfos()); cacheEntry.failWithReply(reply); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 6fbd43a..c75b6ed 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -77,7 +77,7 @@ public class ServerProtoUtils { public static String toString(AppendEntriesReplyProto reply) { return toString(reply.getServerReply()) + "," + reply.getResult() + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm() - + ",followerCommit" + reply.getFollowerCommit(); + + ",followerCommit:" + reply.getFollowerCommit(); } private static String toString(RaftRpcReplyProto reply) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java new file mode 100644 index 0000000..b7d6635 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; + +class WatchRequests { + public static final Logger LOG = LoggerFactory.getLogger(WatchRequests.class); + + static class PendingWatch { + private final WatchRequestTypeProto watch; + private final CompletableFuture<Void> future = new CompletableFuture<>(); + + PendingWatch(WatchRequestTypeProto watch) { + this.watch = watch; + } + + CompletableFuture<Void> getFuture() { + return future; + } + + long getIndex() { + return watch.getIndex(); + } + + @Override + public String toString() { + return RaftClientRequest.Type.toString(watch); + } + } + + private class WatchQueue { + private final ReplicationLevel replication; + private final PriorityQueue<PendingWatch> q = new PriorityQueue<>(Comparator.comparing(PendingWatch::getIndex)); + private volatile long index; //Invariant: q.isEmpty() or index < any element q + + WatchQueue(ReplicationLevel replication) { + this.replication = replication; + } + + long getIndex() { + return index; + } + + synchronized boolean offer(PendingWatch pending) { + if (pending.getIndex() > getIndex()) { // compare again synchronized + final boolean offered = q.offer(pending); + Preconditions.assertTrue(offered); + return true; + } + return false; + } + + synchronized void updateIndex(final long newIndex) { + if (newIndex <= getIndex()) { // compare again synchronized + return; + } + LOG.debug("{}: update {} index from {} to {}", name, replication, index, newIndex); + index = newIndex; + + for(;;) { + final PendingWatch peeked = q.peek(); + if (peeked == null || peeked.getIndex() > newIndex) { + return; + } + final PendingWatch polled = q.poll(); + Preconditions.assertTrue(polled == peeked); + LOG.debug("{}: complete {}", name, polled); + polled.getFuture().complete(null); + } + } + + synchronized void failAll(Exception e) { + for(; !q.isEmpty(); ) { + q.poll().getFuture().completeExceptionally(e); + } + } + } + + private final String name; + private final Map<ReplicationLevel, WatchQueue> queues = new EnumMap<>(ReplicationLevel.class); + + WatchRequests(Object name) { + this.name = name + "-" + getClass().getSimpleName(); + Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r))); + } + + CompletableFuture<Void> add(WatchRequestTypeProto watch) { + final WatchQueue queue = queues.get(watch.getReplication()); + if (watch.getIndex() > queue.getIndex()) { // compare without synchronization + final PendingWatch pending = new PendingWatch(watch); + if (queue.offer(pending)) { + return pending.getFuture(); + } + } + return CompletableFuture.completedFuture(null); + } + + void update(ReplicationLevel replication, final long newIndex) { + final WatchQueue queue = queues.get(replication); + if (newIndex > queue.getIndex()) { // compare without synchronization + queue.updateIndex(newIndex); + } + } + + void failWatches(Exception e) { + queues.values().forEach(q -> q.failAll(e)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java new file mode 100644 index 0000000..5014150 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + static final int NUM_SERVERS = 3; + static final int GET_TIMEOUT_SECOND = 5; + + @Before + public void setup() { + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + RaftClientConfigKeys.Rpc.setRetryInterval(p, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS)); + } + + @Test + public void testWatchRequestAsync() throws Exception { + LOG.info("Running testWatchRequests"); + try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { + cluster.start(); + runTestWatchRequestAsync(cluster, LOG); + } + } + + static void runTestWatchRequestAsync(MiniRaftCluster cluster, Logger LOG) throws Exception { + try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); + final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); + final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); + final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) { + long logIndex; + { + // send the first message + final RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage("message"); + final RaftClientReply reply = writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertTrue(reply.isSuccess()); + logIndex = reply.getLogIndex(); + + final List<CompletableFuture<Void>> futures = new ArrayList<>(); + futures.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY) + .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); + futures.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL) + .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); + futures.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED) + .thenAccept(r -> Assert.assertTrue(r.isSuccess()))); + JavaUtils.allOf(futures).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + } + logIndex++; + + for(int i = 0; i < 5; i++) { + final int numMessages = ThreadLocalRandom.current().nextInt(10) + 1; + runTestWatchRequestAsync(logIndex, numMessages, writeClient, watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG); + logIndex += numMessages; + } + + LOG.info(cluster.printServers()); + } + } + + static void runTestWatchRequestAsync(long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + MiniRaftCluster cluster, Logger LOG) throws Exception { + LOG.info("runTestWatchRequestAsync: startLogIndex={}, numMessages={}", startLogIndex, numMessages); + + // blockStartTransaction of the leader so that no transaction can be committed MAJORITY + final RaftServerImpl leader = cluster.getLeader(); + LOG.info("block leader {}", leader.getId()); + SimpleStateMachine4Testing.get(leader).blockStartTransaction(); + + // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED + final List<RaftServerImpl> followers = cluster.getFollowers(); + final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size())); + LOG.info("block follower {}", blockedFollower.getId()); + SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData(); + + // send a message + final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); + + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + final String message = "m" + logIndex; + LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, message); + replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); + watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); + watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); + } + + Assert.assertEquals(numMessages, replies.size()); + Assert.assertEquals(numMessages, watchMajoritys.size()); + Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchAllCommitteds.size()); + + // since leader is blocked, nothing can be done. + TimeUnit.SECONDS.sleep(1); + assertNotDone(replies); + assertNotDone(watchMajoritys); + assertNotDone(watchAlls); + assertNotDone(watchAllCommitteds); + + // unblock leader so that the transaction can be committed. + SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); + LOG.info("unblock leader {}", leader.getId()); + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex); + final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(logIndex, reply.getLogIndex()); + final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); + Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + } + // but not replicated/committed to all. + TimeUnit.SECONDS.sleep(1); + assertNotDone(watchAlls); + assertNotDone(watchAllCommitteds); + + // unblock follower so that the transaction can be replicated and committed to all. + LOG.info("unblock follower {}", blockedFollower.getId()); + SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); + for(int i = 0; i < numMessages; i++) { + final long logIndex = startLogIndex + i; + LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); + final RaftClientReply watchAllReply = watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); + Assert.assertTrue(watchAllReply.isSuccess()); + + final RaftClientReply watchAllCommittedReply = watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchAllCommittedReply({}) = ", logIndex, watchAllCommittedReply); + Assert.assertTrue(watchAllCommittedReply.isSuccess()); + { // check commit infos + final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); + Assert.assertEquals(NUM_SERVERS, commitInfos.size()); + commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); + } + } + } + + static <T> void assertNotDone(List<CompletableFuture<T>> futures) { + futures.forEach(f -> Assert.assertFalse(f.isDone())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index dcb899a..b59988a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -85,7 +85,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { static class Blocking { enum Type { - START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA + START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA, FLUSH_STATE_MACHINE_DATA } private final EnumMap<Type, CompletableFuture<Void>> maps = new EnumMap<>(Type.class); @@ -299,6 +299,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override + public CompletableFuture<Void> flushStateMachineData(long index) { + blocking.await(Blocking.Type.FLUSH_STATE_MACHINE_DATA); + return CompletableFuture.completedFuture(null); + } + + @Override public void close() { lifeCycle.checkStateAndClose(() -> { running = false; @@ -324,6 +330,13 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { blocking.unblock(Blocking.Type.WRITE_STATE_MACHINE_DATA); } + public void blockFlushStateMachineData() { + blocking.block(Blocking.Type.FLUSH_STATE_MACHINE_DATA); + } + public void unblockFlushStateMachineData() { + blocking.unblock(Blocking.Type.FLUSH_STATE_MACHINE_DATA); + } + @Override public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { LOG.info("{}: notifySlowness {}, {}", this, group, roleInfoProto);
