Repository: incubator-ratis
Updated Branches:
  refs/heads/master 59de6bd6b -> a82cd7b12


RATIS-234. Add an feature to watch if a request is replicated/committed to a 
particular ReplicationLevel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a82cd7b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a82cd7b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a82cd7b1

Branch: refs/heads/master
Commit: a82cd7b126f432cb2851e61155042c9f9ecfdb79
Parents: 59de6bd
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Wed Oct 10 16:29:44 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Wed Oct 10 16:29:44 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |   6 +
 .../ratis/client/RaftClientConfigKeys.java      |   3 +
 .../ratis/client/impl/ClientProtoUtils.java     |  17 +-
 .../ratis/client/impl/RaftClientImpl.java       |  27 ++-
 .../apache/ratis/protocol/RaftClientReply.java  |  16 +-
 .../ratis/protocol/RaftClientRequest.java       |  76 +++++---
 .../ratis/protocol/ServerInformationReply.java  |   2 +-
 .../org/apache/ratis/util/PeerProxyMap.java     |   4 +
 .../ratis/grpc/TestWatchRequestWithGrpc.java    |  25 +++
 ratis-proto/src/main/proto/Raft.proto           |  13 ++
 .../apache/ratis/server/impl/LeaderState.java   |  37 +++-
 .../apache/ratis/server/impl/LogAppender.java   |   6 +-
 .../ratis/server/impl/PendingRequest.java       |   6 +-
 .../ratis/server/impl/PendingRequests.java      |  34 ++--
 .../ratis/server/impl/RaftServerImpl.java       |  25 ++-
 .../ratis/server/impl/ServerProtoUtils.java     |   2 +-
 .../apache/ratis/server/impl/WatchRequests.java | 136 +++++++++++++
 .../org/apache/ratis/WatchRequestTests.java     | 193 +++++++++++++++++++
 .../SimpleStateMachine4Testing.java             |  15 +-
 19 files changed, 571 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 5af28bb..0b8fdb0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -66,6 +66,9 @@ public interface RaftClient extends Closeable {
   /** Async call to send the given stale-read message to the given server (not 
the raft service). */
   CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long 
minIndex, RaftPeerId server);
 
+  /** Async call to watch the given index to satisfy the given replication 
level. */
+  CompletableFuture<RaftClientReply> sendWatchAsync(long index, 
ReplicationLevel replication);
+
   /**
    * Send the given message to the raft service.
    * The message may change the state of the service.
@@ -88,6 +91,9 @@ public interface RaftClient extends Closeable {
   /** Send the given stale-read message to the given server (not the raft 
service). */
   RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId 
server) throws IOException;
 
+  /** Watch the given index to satisfy the given replication level. */
+  RaftClientReply sendWatch(long index, ReplicationLevel replication) throws 
IOException;
+
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws 
IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index b54f15c..10fc69d 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -45,6 +45,9 @@ public interface RaftClientConfigKeys {
       return 
getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
           RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog());
     }
+    static void setRetryInterval(RaftProperties properties, TimeDuration 
timeoutDuration) {
+      setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY, 
timeoutDuration);
+    }
 
     String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
     TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, 
TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index f2a1e90..a9e509a 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -76,6 +76,8 @@ public interface ClientProtoUtils {
         return RaftClientRequest.Type.valueOf(p.getRead());
       case STALEREAD:
         return RaftClientRequest.Type.valueOf(p.getStaleRead());
+      case WATCH:
+        return RaftClientRequest.Type.valueOf(p.getWatch());
       default:
         throw new IllegalArgumentException("Unexpected request type: " + 
p.getTypeCase()
             + " in request proto " + p);
@@ -98,8 +100,10 @@ public interface ClientProtoUtils {
   static RaftClientRequestProto toRaftClientRequestProto(
       RaftClientRequest request) {
     final RaftClientRequestProto.Builder b = 
RaftClientRequestProto.newBuilder()
-        .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
-        .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
+        .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
+    if (request.getMessage() != null) {
+      b.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
+    }
 
     final RaftClientRequest.Type type = request.getType();
     switch (type.getTypeCase()) {
@@ -112,6 +116,9 @@ public interface ClientProtoUtils {
       case STALEREAD:
         b.setStaleRead(type.getStaleRead());
         break;
+      case WATCH:
+        b.setWatch(type.getWatch());
+        break;
       default:
         throw new IllegalArgumentException("Unexpected request type: " + 
request.getType()
             + " in request " + request);
@@ -131,13 +138,13 @@ public interface ClientProtoUtils {
         .build();
   }
 
-  static RaftClientReplyProto toRaftClientReplyProto(
-      RaftClientReply reply) {
+  static RaftClientReplyProto toRaftClientReplyProto(RaftClientReply reply) {
     final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
     if (reply != null) {
       
b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(),
           reply.getServerId().toByteString(), reply.getRaftGroupId(),
           reply.getCallId(), reply.isSuccess()));
+      b.setLogIndex(reply.getLogIndex());
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
       }
@@ -223,7 +230,7 @@ public interface ClientProtoUtils {
     return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
         groupId, rp.getCallId(), rp.getSuccess(),
         toMessage(replyProto.getMessage()), e,
-        replyProto.getCommitInfosList());
+        replyProto.getLogIndex(), replyProto.getCommitInfosList());
   }
 
   static ServerInformationReply toServerInformationReply(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index aa4e9c8..93c5caa 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -40,6 +40,7 @@ import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
+import static 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.WATCH;
 
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
@@ -149,14 +150,21 @@ final class RaftClientImpl implements RaftClient {
     return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
   }
 
+  @Override
+  public CompletableFuture<RaftClientReply> sendWatchAsync(long index, 
ReplicationLevel replication) {
+    return sendAsync(RaftClientRequest.watchRequestType(index, replication), 
null, null);
+  }
+
   private CompletableFuture<RaftClientReply> sendAsync(
       RaftClientRequest.Type type, Message message, RaftPeerId server) {
-    Objects.requireNonNull(message, "message == null");
+    if (!type.is(WATCH)) {
+      Objects.requireNonNull(message, "message == null");
+    }
     try {
       asyncRequestSemaphore.acquire();
     } catch (InterruptedException e) {
       throw new CompletionException(IOUtils.toInterruptedIOException(
-          "Interrupted when sending " + message, e));
+          "Interrupted when sending " + type + ", message=" + message, e));
     }
     final long callId = nextCallId();
     final LongFunction<PendingAsyncRequest> constructor = seqNum -> new 
PendingAsyncRequest(seqNum,
@@ -189,9 +197,16 @@ final class RaftClientImpl implements RaftClient {
     return send(RaftClientRequest.staleReadRequestType(minIndex), message, 
server);
   }
 
+  @Override
+  public RaftClientReply sendWatch(long index, ReplicationLevel replication) 
throws IOException {
+    return send(RaftClientRequest.watchRequestType(index, replication), null, 
null);
+  }
+
   private RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
       throws IOException {
-    Objects.requireNonNull(message, "message == null");
+    if (!type.is(WATCH)) {
+      Objects.requireNonNull(message, "message == null");
+    }
 
     final long callId = nextCallId();
     return sendRequestWithRetry(() -> newRaftClientRequest(
@@ -292,7 +307,11 @@ final class RaftClientImpl implements RaftClient {
       }
       return reply;
     }).exceptionally(e -> {
-      LOG.debug("{}: Failed {} with {}", clientId, request, e);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(clientId + ": Failed " + request, e);
+      } else {
+        LOG.debug("{}: Failed {} with {}", clientId, request, e);
+      }
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof GroupMismatchException) {
         throw new CompletionException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 5175482..4c290ff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -43,18 +43,20 @@ public class RaftClientReply extends RaftClientMessage {
   private final RaftException exception;
   private final Message message;
 
+  private final long logIndex;
   /** The commit information when the reply is created. */
   private final Collection<CommitInfoProto> commitInfos;
 
   public RaftClientReply(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, boolean success, Message message, RaftException exception,
-      Collection<CommitInfoProto> commitInfos) {
+      long logIndex, Collection<CommitInfoProto> commitInfos) {
     super(clientId, serverId, groupId);
     this.success = success;
     this.callId = callId;
     this.message = message;
     this.exception = exception;
+    this.logIndex = logIndex;
     this.commitInfos = commitInfos != null? commitInfos: 
Collections.emptyList();
 
     if (exception != null) {
@@ -68,7 +70,7 @@ public class RaftClientReply extends RaftClientMessage {
 
   public RaftClientReply(RaftClientRequest request, RaftException exception, 
Collection<CommitInfoProto> commitInfos) {
     this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), false, null, exception, commitInfos);
+        request.getCallId(), false, null, exception, 0L, commitInfos);
   }
 
   public RaftClientReply(RaftClientRequest request, 
Collection<CommitInfoProto> commitInfos) {
@@ -77,12 +79,12 @@ public class RaftClientReply extends RaftClientMessage {
 
   public RaftClientReply(RaftClientRequest request, Message message, 
Collection<CommitInfoProto> commitInfos) {
     this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), true, message, null, commitInfos);
+        request.getCallId(), true, message, null, 0L, commitInfos);
   }
 
   public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
     this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
-        reply.getCallId(), false, reply.getMessage(), nre, 
reply.getCommitInfos());
+        reply.getCallId(), false, reply.getMessage(), nre, 
reply.getLogIndex(), reply.getCommitInfos());
   }
 
   /**
@@ -104,11 +106,15 @@ public class RaftClientReply extends RaftClientMessage {
     return callId;
   }
 
+  public long getLogIndex() {
+    return logIndex;
+  }
+
   @Override
   public String toString() {
     return super.toString() + ", cid=" + getCallId() + ", "
         + (isSuccess()? "SUCCESS":  "FAILED " + exception)
-        + ", commits" + ProtoUtils.toString(commitInfos);
+        + ", logIndex=" + getLogIndex() + ", commits" + 
ProtoUtils.toString(commitInfos);
   }
 
   public boolean isSuccess() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index a9fe740..48d203a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -53,7 +53,11 @@ public class RaftClientRequest extends RaftClientMessage {
         : new 
Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
   }
 
-  /** The type of a request (oneof write, read, staleRead; see the message 
RaftClientRequestProto). */
+  public static Type watchRequestType(long index, ReplicationLevel 
replication) {
+    return new 
Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build());
+  }
+
+  /** The type of a request (oneof write, read, staleRead, watch; see the 
message RaftClientRequestProto). */
   public static class Type {
     public static Type valueOf(WriteRequestTypeProto write) {
       return writeRequestType(write.getReplication());
@@ -68,35 +72,41 @@ public class RaftClientRequest extends RaftClientMessage {
           : new Type(staleRead);
     }
 
+    public static Type valueOf(WatchRequestTypeProto watch) {
+      return watchRequestType(watch.getIndex(), watch.getReplication());
+    }
+
     /**
      * The type case of the proto.
      * Only the corresponding proto (must be non-null) is used.
      * The other protos are ignored.
      */
     private final RaftClientRequestProto.TypeCase typeCase;
-    private final WriteRequestTypeProto write;
-    private final ReadRequestTypeProto read;
-    private final StaleReadRequestTypeProto staleRead;
+    private final Object proto;
+
+    private Type(RaftClientRequestProto.TypeCase typeCase, Object proto) {
+      this.typeCase = Objects.requireNonNull(typeCase, "typeCase == null");
+      this.proto = Objects.requireNonNull(proto, "proto == null");
+    }
 
     private Type(WriteRequestTypeProto write) {
-      this.typeCase = WRITE;
-      this.write = Objects.requireNonNull(write);
-      this.read = null;
-      this.staleRead = null;
+      this(WRITE, write);
     }
 
     private Type(ReadRequestTypeProto read) {
-      this.typeCase = READ;
-      this.write = null;
-      this.read = Objects.requireNonNull(read);
-      this.staleRead = null;
+      this(READ, read);
     }
 
     private Type(StaleReadRequestTypeProto staleRead) {
-      this.typeCase = STALEREAD;
-      this.write = null;
-      this.read = null;
-      this.staleRead = Objects.requireNonNull(staleRead);
+      this(STALEREAD, staleRead);
+    }
+
+    private Type(WatchRequestTypeProto watch) {
+      this(WATCH, watch);
+    }
+
+    public boolean is(RaftClientRequestProto.TypeCase typeCase) {
+      return getTypeCase().equals(typeCase);
     }
 
     public RaftClientRequestProto.TypeCase getTypeCase() {
@@ -104,30 +114,44 @@ public class RaftClientRequest extends RaftClientMessage {
     }
 
     public WriteRequestTypeProto getWrite() {
-      Preconditions.assertTrue(typeCase == WRITE);
-      return write;
+      Preconditions.assertTrue(is(WRITE));
+      return (WriteRequestTypeProto)proto;
     }
 
     public ReadRequestTypeProto getRead() {
-      Preconditions.assertTrue(typeCase == READ);
-      return read;
+      Preconditions.assertTrue(is(READ));
+      return (ReadRequestTypeProto)proto;
     }
 
     public StaleReadRequestTypeProto getStaleRead() {
-      Preconditions.assertTrue(typeCase == STALEREAD);
-      return staleRead;
+      Preconditions.assertTrue(is(STALEREAD));
+      return (StaleReadRequestTypeProto)proto;
+    }
+
+    public WatchRequestTypeProto getWatch() {
+      Preconditions.assertTrue(is(WATCH));
+      return (WatchRequestTypeProto)proto;
+    }
+
+    static String toString(ReplicationLevel replication) {
+      return replication == ReplicationLevel.MAJORITY? "": "-" + replication;
+    }
+
+    public static String toString(WatchRequestTypeProto w) {
+      return "Watch" + toString(w.getReplication()) + "(" + w.getIndex() + ")";
     }
 
     @Override
     public String toString() {
       switch (typeCase) {
         case WRITE:
-          final ReplicationLevel replication = write.getReplication();
-          return "RW" + (replication == ReplicationLevel.MAJORITY? "": "-" + 
replication);
+          return "RW" + toString(getWrite().getReplication());
         case READ:
           return "RO";
         case STALEREAD:
-          return "StaleRead(" + staleRead.getMinIndex() + ")";
+          return "StaleRead(" + getStaleRead().getMinIndex() + ")";
+        case WATCH:
+          return toString(getWatch());
         default:
           throw new IllegalStateException("Unexpected request type: " + 
typeCase);
       }
@@ -177,7 +201,7 @@ public class RaftClientRequest extends RaftClientMessage {
   }
 
   public boolean is(RaftClientRequestProto.TypeCase typeCase) {
-    return getType().getTypeCase() == typeCase;
+    return getType().is(typeCase);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
index 18bf1c0..45a94ea 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
@@ -43,7 +43,7 @@ public class ServerInformationReply extends RaftClientReply {
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, boolean success, RoleInfoProto roleInfoProto,
       boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, 
RaftGroup group) {
-    super(clientId, serverId, groupId, callId, success, null, null, 
commitInfos);
+    super(clientId, serverId, groupId, callId, success, null, null, 0L, 
commitInfos);
     this.roleInfoProto = roleInfoProto;
     this.isRaftStorageHealthy = isRaftStorageHealthy;
     this.group = group;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 53a2936..d3d80ef 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -51,6 +51,10 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
       if (proxy == null) {
         synchronized (this) {
           if (proxy == null) {
+            final LifeCycle.State current = lifeCycle.getCurrentState();
+            if (current.isOneOf(LifeCycle.State.CLOSING, 
LifeCycle.State.CLOSED)) {
+              throw new IOException(name + " is already " + current);
+            }
             lifeCycle.startAndTransition(
                 () -> proxy = createProxy.apply(peer), IOException.class);
           }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
new file mode 100644
index 0000000..7b9061b
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.WatchRequestTests;
+
+public class TestWatchRequestWithGrpc
+    extends WatchRequestTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-proto/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index e0916fd..d9a4f08 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -174,8 +174,14 @@ message ClientMessageEntryProto {
 }
 
 enum ReplicationLevel {
+  /** Committed at the leader and replicated to the majority of peers. */
   MAJORITY = 0;
+  /** Committed at the leader and replicated to all peers.
+       Note that ReplicationLevel.ALL implies ReplicationLevel.MAJORITY. */
   ALL = 1;
+  /** Committed at all peers.
+      Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL. */
+  ALL_COMMITTED = 2;
 }
 
 
@@ -197,6 +203,11 @@ message StaleReadRequestTypeProto {
   uint64 minIndex = 1;
 }
 
+message WatchRequestTypeProto {
+  uint64 index = 1;
+  ReplicationLevel replication = 2;
+}
+
 // normal client request
 message RaftClientRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
@@ -206,6 +217,7 @@ message RaftClientRequestProto {
     WriteRequestTypeProto write = 3;
     ReadRequestTypeProto read = 4;
     StaleReadRequestTypeProto staleRead = 5;
+    WatchRequestTypeProto watch = 6;
   }
 }
 
@@ -236,6 +248,7 @@ message RaftClientReplyProto {
     StateMachineExceptionProto stateMachineException = 5;
   }
 
+  uint64 logIndex = 14; // When the request is a write request and the reply 
is success, the log index of the transaction
   repeated CommitInfoProto commitInfos = 15;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index b4b613e..48f0c1c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -173,6 +175,7 @@ public class LeaderState {
   private final EventQueue eventQueue = new EventQueue();
   private final EventProcessor processor;
   private final PendingRequests pendingRequests;
+  private final WatchRequests watchRequests;
   private volatile boolean running = true;
 
   private final int stagingCatchupGap;
@@ -189,7 +192,8 @@ public class LeaderState {
     this.raftLog = state.getLog();
     this.currentTerm = state.getCurrentTerm();
     processor = new EventProcessor();
-    pendingRequests = new PendingRequests(server);
+    this.pendingRequests = new PendingRequests(server.getId());
+    this.watchRequests = new WatchRequests(server);
 
     final RaftConfiguration conf = server.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
@@ -229,8 +233,12 @@ public class LeaderState {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of 
logSync
     senders.forEach(LogAppender::stopAppender);
+    final NotLeaderException nle = server.generateNotLeaderException();
+    final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
-      pendingRequests.sendNotLeaderResponses();
+      final Collection<TransactionContext> transactions = 
pendingRequests.sendNotLeaderResponses(nle, commitInfos);
+      server.getStateMachine().notifyNotLeader(transactions);
+      watchRequests.failWatches(nle);
     } catch (IOException e) {
       LOG.warn(server.getId() + ": Caught exception in 
sendNotLeaderResponses", e);
     }
@@ -286,6 +294,24 @@ public class LeaderState {
     return pendingRequests.addPendingRequest(index, request, entry);
   }
 
+  CompletableFuture<Void> addWatchReqeust(RaftClientRequest request) {
+    LOG.debug("{}: addWatchRequest {}", server.getId(), request);
+    return watchRequests.add(request.getType().getWatch());
+  }
+
+  void commitIndexChanged() {
+    final long leader = raftLog.getLastCommittedIndex();
+    final long min = senders.stream()
+        .map(LogAppender::getFollower)
+        .map(FollowerInfo::getCommitIndex)
+        .min(Long::compare)
+        .orElse(leader); // it happens only if senders.isEmpty()
+    Preconditions.assertTrue(leader >= min); // leader commit index should 
always be ahead followers
+
+    watchRequests.update(ReplicationLevel.MAJORITY, leader);
+    watchRequests.update(ReplicationLevel.ALL_COMMITTED, min);
+  }
+
   private void applyOldNewConf() {
     final ServerState state = server.getState();
     final RaftConfiguration current = server.getRaftConf();
@@ -510,10 +536,13 @@ public class LeaderState {
       // the log gets purged after the statemachine does a snapshot
       final TermIndex[] entriesToCommit = raftLog.getEntries(
           oldLastCommitted + 1, majority + 1);
-      server.getState().updateStatemachine(majority, currentTerm);
+      if (server.getState().updateStatemachine(majority, currentTerm)) {
+        commitIndexChanged();
+      }
       checkAndUpdateConfiguration(entriesToCommit);
     }
 
+    watchRequests.update(ReplicationLevel.ALL, min);
     pendingRequests.checkDelayedReplies(min);
   }
 
@@ -533,7 +562,7 @@ public class LeaderState {
       if (conf.isTransitional()) {
         replicateNewConf();
       } else { // the (new) log entry has been committed
-        pendingRequests.replySetConfiguration();
+        pendingRequests.replySetConfiguration(server::getCommitInfos);
         // if the leader is not included in the current configuration, step 
down
         if (!conf.containsInConf(server.getId())) {
           LOG.info("{} is not included in the new configuration {}. Step 
down.",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 4dff3e5..f26a48c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -244,7 +244,7 @@ public class LogAppender {
         throw e;
       } catch (IOException ioe) {
         // TODO should have more detailed retry policy here.
-        if (retry % 10 == 1) { // to reduce the number of messages
+        if (retry++ % 10 == 0) { // to reduce the number of messages
           LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, 
retry++, ioe);
         }
         handleException(ioe);
@@ -257,7 +257,9 @@ public class LogAppender {
   }
 
   protected void updateCommitIndex(long commitIndex) {
-    follower.updateCommitIndex(commitIndex);
+    if (follower.updateCommitIndex(commitIndex)) {
+      server.commitIndexChanged();
+    }
   }
 
   protected class SnapshotRequestIter

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index a95184a..35e3082 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,12 +17,14 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.impl.RetryCache.CacheEntry;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
@@ -111,8 +113,8 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
     setReply(delayed.fail(e));
   }
 
-  TransactionContext setNotLeaderException(NotLeaderException nle) {
-    setReply(new RaftClientReply(getRequest(), nle, null));
+  TransactionContext setNotLeaderException(NotLeaderException nle, 
Collection<CommitInfoProto> commitInfos) {
+    setReply(new RaftClientReply(getRequest(), nle, commitInfos));
     return getEntry();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 9a4ed74..fce1cf1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.impl.RetryCache.CacheEntry;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -26,12 +27,12 @@ import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 class PendingRequests {
@@ -63,11 +64,11 @@ class PendingRequests {
       return r;
     }
 
-    Collection<TransactionContext> setNotLeaderException(NotLeaderException 
nle) {
+    Collection<TransactionContext> setNotLeaderException(NotLeaderException 
nle, Collection<CommitInfoProto> commitInfos) {
       LOG.debug("{}: PendingRequests.setNotLeaderException", name);
       try {
         return map.values().stream()
-            .map(p -> p.setNotLeaderException(nle))
+            .map(p -> p.setNotLeaderException(nle, commitInfos))
             .collect(Collectors.toList());
       } finally {
         map.clear();
@@ -131,16 +132,16 @@ class PendingRequests {
   }
 
   private PendingRequest pendingSetConf;
-  private final RaftServerImpl server;
+  private final String name;
   private final RequestMap pendingRequests;
   private PendingRequest last = null;
 
   private final DelayedReplies delayedReplies;
 
-  PendingRequests(RaftServerImpl server) {
-    this.server = server;
-    this.pendingRequests = new RequestMap(server.getId());
-    this.delayedReplies = new DelayedReplies(server.getId());
+  PendingRequests(RaftPeerId id) {
+    this.name = id + "-" + getClass().getSimpleName();
+    this.pendingRequests = new RequestMap(id);
+    this.delayedReplies = new DelayedReplies(id);
   }
 
   PendingRequest addPendingRequest(long index, RaftClientRequest request,
@@ -169,16 +170,16 @@ class PendingRequests {
     return pendingSetConf;
   }
 
-  void replySetConfiguration() {
+  void replySetConfiguration(Supplier<Collection<CommitInfoProto>> 
getCommitInfos) {
     // we allow the pendingRequest to be null in case that the new leader
     // commits the new configuration while it has not received the retry
     // request from the client
     if (pendingSetConf != null) {
       final RaftClientRequest request = pendingSetConf.getRequest();
-      LOG.debug("{}: sends success for {}", server.getId(), request);
+      LOG.debug("{}: sends success for {}", name, request);
       // for setConfiguration we do not need to wait for statemachine. send 
back
       // reply after it's committed.
-      pendingSetConf.setReply(new RaftClientReply(request, 
server.getCommitInfos()));
+      pendingSetConf.setReply(new RaftClientReply(request, 
getCommitInfos.get()));
       pendingSetConf = null;
     }
   }
@@ -217,16 +218,15 @@ class PendingRequests {
    * The leader state is stopped. Send NotLeaderException to all the pending
    * requests since they have not got applied to the state machine yet.
    */
-  void sendNotLeaderResponses() throws IOException {
-    LOG.info("{}: sendNotLeaderResponses", server.getId());
+  Collection<TransactionContext> sendNotLeaderResponses(NotLeaderException 
nle, Collection<CommitInfoProto> commitInfos) {
+    LOG.info("{}: sendNotLeaderResponses", name);
 
-    // notify the state machine about stepping down
-    final NotLeaderException nle = server.generateNotLeaderException();
-    
server.getStateMachine().notifyNotLeader(pendingRequests.setNotLeaderException(nle));
+    final Collection<TransactionContext> transactions = 
pendingRequests.setNotLeaderException(nle, commitInfos);
     if (pendingSetConf != null) {
-      pendingSetConf.setNotLeaderException(nle);
+      pendingSetConf.setNotLeaderException(nle, commitInfos);
     }
     delayedReplies.failReplies();
+    return transactions;
   }
 
   void checkDelayedReplies(long allAckedIndex) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2d7f85a..a1c1192 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -520,6 +520,10 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       return processQueryFuture(stateMachine.query(request.getMessage()), 
request);
     }
 
+    if (request.is(RaftClientRequestProto.TypeCase.WATCH)) {
+      return watchAsync(request);
+    }
+
     // query the retry cache
     RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
         request.getClientId(), request.getCallId());
@@ -543,6 +547,13 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return appendTransaction(request, context, cacheEntry);
   }
 
+  private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest 
request) {
+    return role.getLeaderState()
+        .map(ls -> ls.addWatchReqeust(request).thenApply(v -> new 
RaftClientReply(request, getCommitInfos())))
+        .orElseGet(() -> CompletableFuture.completedFuture(
+            new RaftClientReply(request, generateNotLeaderException(), 
getCommitInfos())));
+  }
+
   private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest 
request) {
     final long minIndex = request.getType().getStaleRead().getMinIndex();
     final long commitIndex = state.getLog().getLastCommittedIndex();
@@ -999,6 +1010,10 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         groupId, term, lastEntry);
   }
 
+  void commitIndexChanged() {
+    role.getLeaderState().ifPresent(LeaderState::commitIndexChanged);
+  }
+
   public void submitUpdateCommitEvent() {
     role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent);
   }
@@ -1022,15 +1037,16 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
     }
 
+    final long logIndex = logEntry.getIndex();
     return stateMachineFuture.whenComplete((reply, exception) -> {
       final RaftClientReply r;
       if (exception == null) {
-        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null, getCommitInfos());
+        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null, logIndex, getCommitInfos());
       } else {
         // the exception is coming from the state machine. wrap it into the
         // reply as a StateMachineException
         final StateMachineException e = new StateMachineException(getId(), 
exception);
-        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e, getCommitInfos());
+        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e, logIndex, getCommitInfos());
       }
 
       // update pending request
@@ -1040,7 +1056,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         if (isLeader() && leaderState != null) { // is leader and is running
           // For leader, update cache unless the reply is delayed.
           // When a reply is delayed, the cache will be updated in 
DelayedReply.getReply().
-          updateCache = leaderState.replyPendingRequest(logEntry.getIndex(), 
r, cacheEntry);
+          updateCache = leaderState.replyPendingRequest(logIndex, r, 
cacheEntry);
         }
       }
       if (updateCache) {
@@ -1091,7 +1107,8 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, 
logEntry.getCallId());
       if (cacheEntry != null) {
         final RaftClientReply reply = new RaftClientReply(clientId, getId(), 
getGroupId(),
-            logEntry.getCallId(), false, null, generateNotLeaderException(), 
getCommitInfos());
+            logEntry.getCallId(), false, null, generateNotLeaderException(),
+            logEntry.getIndex(), getCommitInfos());
         cacheEntry.failWithReply(reply);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 6fbd43a..c75b6ed 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -77,7 +77,7 @@ public class ServerProtoUtils {
   public static String toString(AppendEntriesReplyProto reply) {
     return toString(reply.getServerReply()) + "," + reply.getResult()
         + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
-        + ",followerCommit" + reply.getFollowerCommit();
+        + ",followerCommit:" + reply.getFollowerCommit();
   }
 
   private static String toString(RaftRpcReplyProto reply) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
new file mode 100644
index 0000000..b7d6635
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+class WatchRequests {
+  public static final Logger LOG = 
LoggerFactory.getLogger(WatchRequests.class);
+
+  static class PendingWatch {
+    private final WatchRequestTypeProto watch;
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    PendingWatch(WatchRequestTypeProto watch) {
+      this.watch = watch;
+    }
+
+    CompletableFuture<Void> getFuture() {
+      return future;
+    }
+
+    long getIndex() {
+      return watch.getIndex();
+    }
+
+    @Override
+    public String toString() {
+      return RaftClientRequest.Type.toString(watch);
+    }
+  }
+
+  private class WatchQueue {
+    private final ReplicationLevel replication;
+    private final PriorityQueue<PendingWatch> q = new 
PriorityQueue<>(Comparator.comparing(PendingWatch::getIndex));
+    private volatile long index; //Invariant: q.isEmpty() or index < any 
element q
+
+    WatchQueue(ReplicationLevel replication) {
+      this.replication = replication;
+    }
+
+    long getIndex() {
+      return index;
+    }
+
+    synchronized boolean offer(PendingWatch pending) {
+      if (pending.getIndex() > getIndex()) { // compare again synchronized
+        final boolean offered = q.offer(pending);
+        Preconditions.assertTrue(offered);
+        return true;
+      }
+      return false;
+    }
+
+    synchronized void updateIndex(final long newIndex) {
+      if (newIndex <= getIndex()) { // compare again synchronized
+        return;
+      }
+      LOG.debug("{}: update {} index from {} to {}", name, replication, index, 
newIndex);
+      index = newIndex;
+
+      for(;;) {
+        final PendingWatch peeked = q.peek();
+        if (peeked == null || peeked.getIndex() > newIndex) {
+          return;
+        }
+        final PendingWatch polled = q.poll();
+        Preconditions.assertTrue(polled == peeked);
+        LOG.debug("{}: complete {}", name, polled);
+        polled.getFuture().complete(null);
+      }
+    }
+
+    synchronized void failAll(Exception e) {
+      for(; !q.isEmpty(); ) {
+        q.poll().getFuture().completeExceptionally(e);
+      }
+    }
+  }
+
+  private final String name;
+  private final Map<ReplicationLevel, WatchQueue> queues = new 
EnumMap<>(ReplicationLevel.class);
+
+  WatchRequests(Object name) {
+    this.name = name + "-" + getClass().getSimpleName();
+    Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new 
WatchQueue(r)));
+  }
+
+  CompletableFuture<Void> add(WatchRequestTypeProto watch) {
+    final WatchQueue queue = queues.get(watch.getReplication());
+    if (watch.getIndex() > queue.getIndex()) { // compare without 
synchronization
+      final PendingWatch pending = new PendingWatch(watch);
+      if (queue.offer(pending)) {
+        return pending.getFuture();
+      }
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  void update(ReplicationLevel replication, final long newIndex) {
+    final WatchQueue queue = queues.get(replication);
+    if (newIndex > queue.getIndex()) { // compare without synchronization
+      queue.updateIndex(newIndex);
+    }
+  }
+
+  void failWatches(Exception e) {
+    queues.values().forEach(q -> q.failAll(e));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
new file mode 100644
index 0000000..5014150
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  static final int NUM_SERVERS = 3;
+  static final int GET_TIMEOUT_SECOND = 5;
+
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftClientConfigKeys.Rpc.setRetryInterval(p, TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS));
+  }
+
+  @Test
+  public void testWatchRequestAsync() throws Exception {
+    LOG.info("Running testWatchRequests");
+    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
+      cluster.start();
+      runTestWatchRequestAsync(cluster, LOG);
+    }
+  }
+
+  static void runTestWatchRequestAsync(MiniRaftCluster cluster, Logger LOG) 
throws Exception {
+    try(final RaftClient writeClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
+        final RaftClient watchMajorityClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
+        final RaftClient watchAllClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
+        final RaftClient watchAllCommittedClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
+      long logIndex;
+      {
+        // send the first message
+        final RaftTestUtil.SimpleMessage message = new 
RaftTestUtil.SimpleMessage("message");
+        final RaftClientReply reply = 
writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+        Assert.assertTrue(reply.isSuccess());
+        logIndex = reply.getLogIndex();
+
+        final List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY)
+            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
+        futures.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL)
+            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
+        futures.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED)
+            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
+        JavaUtils.allOf(futures).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      }
+      logIndex++;
+
+      for(int i = 0; i < 5; i++) {
+        final int numMessages = ThreadLocalRandom.current().nextInt(10) + 1;
+        runTestWatchRequestAsync(logIndex, numMessages, writeClient, 
watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG);
+        logIndex += numMessages;
+      }
+
+      LOG.info(cluster.printServers());
+    }
+  }
+
+  static void runTestWatchRequestAsync(long startLogIndex, int numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient, RaftClient watchAllCommittedClient,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
+    LOG.info("runTestWatchRequestAsync: startLogIndex={}, numMessages={}", 
startLogIndex, numMessages);
+
+    // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
+    final RaftServerImpl leader = cluster.getLeader();
+    LOG.info("block leader {}", leader.getId());
+    SimpleStateMachine4Testing.get(leader).blockStartTransaction();
+
+    // blockFlushStateMachineData a follower so that no transaction can be 
ALL_COMMITTED
+    final List<RaftServerImpl> followers = cluster.getFollowers();
+    final RaftServerImpl blockedFollower = 
followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+    LOG.info("block follower {}", blockedFollower.getId());
+    
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
+
+    // send a message
+    final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
+
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      final String message = "m" + logIndex;
+      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, 
message);
+      replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
+      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
+      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
+    }
+
+    Assert.assertEquals(numMessages, replies.size());
+    Assert.assertEquals(numMessages, watchMajoritys.size());
+    Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchAllCommitteds.size());
+
+    // since leader is blocked, nothing can be done.
+    TimeUnit.SECONDS.sleep(1);
+    assertNotDone(replies);
+    assertNotDone(watchMajoritys);
+    assertNotDone(watchAlls);
+    assertNotDone(watchAllCommitteds);
+
+    // unblock leader so that the transaction can be committed.
+    SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
+    LOG.info("unblock leader {}", leader.getId());
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex);
+      final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(logIndex, reply.getLogIndex());
+      final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
+      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+    }
+    // but not replicated/committed to all.
+    TimeUnit.SECONDS.sleep(1);
+    assertNotDone(watchAlls);
+    assertNotDone(watchAllCommitteds);
+
+    // unblock follower so that the transaction can be replicated and 
committed to all.
+    LOG.info("unblock follower {}", blockedFollower.getId());
+    
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
+      final RaftClientReply watchAllReply = 
watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
+      Assert.assertTrue(watchAllReply.isSuccess());
+
+      final RaftClientReply watchAllCommittedReply = 
watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchAllCommittedReply({}) = ", logIndex, 
watchAllCommittedReply);
+      Assert.assertTrue(watchAllCommittedReply.isSuccess());
+      { // check commit infos
+        final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
+        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
+        commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
+      }
+    }
+  }
+
+  static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
+    futures.forEach(f -> Assert.assertFalse(f.isDone()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a82cd7b1/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index dcb899a..b59988a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -85,7 +85,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
 
   static class Blocking {
     enum Type {
-      START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA
+      START_TRANSACTION, READ_STATE_MACHINE_DATA, WRITE_STATE_MACHINE_DATA, 
FLUSH_STATE_MACHINE_DATA
     }
 
     private final EnumMap<Type, CompletableFuture<Void>> maps = new 
EnumMap<>(Type.class);
@@ -299,6 +299,12 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   @Override
+  public CompletableFuture<Void> flushStateMachineData(long index) {
+    blocking.await(Blocking.Type.FLUSH_STATE_MACHINE_DATA);
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
   public void close() {
     lifeCycle.checkStateAndClose(() -> {
       running = false;
@@ -324,6 +330,13 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     blocking.unblock(Blocking.Type.WRITE_STATE_MACHINE_DATA);
   }
 
+  public void blockFlushStateMachineData() {
+    blocking.block(Blocking.Type.FLUSH_STATE_MACHINE_DATA);
+  }
+  public void unblockFlushStateMachineData() {
+    blocking.unblock(Blocking.Type.FLUSH_STATE_MACHINE_DATA);
+  }
+
   @Override
   public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
     LOG.info("{}: notifySlowness {}, {}", this, group, roleInfoProto);

Reply via email to