Repository: incubator-ratis
Updated Branches:
  refs/heads/master 5de0fa647 -> 1f07109d7


RATIS-141. In RaftClientProtocolService, the assumption of consecutive callId 
is invalid. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1f07109d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1f07109d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1f07109d

Branch: refs/heads/master
Commit: 1f07109d71c683455af484e16d3966b18d415c45
Parents: 5de0fa6
Author: Chen Liang <[email protected]>
Authored: Tue Nov 28 15:17:32 2017 -0800
Committer: Chen Liang <[email protected]>
Committed: Tue Nov 28 15:17:32 2017 -0800

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     | 42 +++++++++++++----
 .../ratis/client/impl/RaftClientImpl.java       | 10 ++++-
 .../ratis/protocol/RaftClientRequest.java       | 18 ++++++--
 .../java/org/apache/ratis/util/StringUtils.java | 16 +++++++
 .../ratis/examples/ParameterizedBaseTest.java   |  8 +++-
 .../TestRaftStateMachineException.java          |  6 ++-
 .../org/apache/ratis/grpc/RaftGrpcUtil.java     | 11 ++++-
 .../ratis/grpc/client/AppendStreamer.java       | 29 ++++++------
 .../grpc/client/RaftClientProtocolService.java  | 47 +++++++++++---------
 .../org/apache/ratis/grpc/TestRaftStream.java   | 33 +++++++-------
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |  5 ---
 ratis-proto-shaded/src/main/proto/Raft.proto    | 13 +++---
 .../ratis/server/impl/PendingRequests.java      |  2 +-
 .../ratis/server/impl/RaftServerConstants.java  |  1 +
 .../ratis/server/impl/ServerProtoUtils.java     |  3 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  4 +-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 32 ++++++++++---
 .../java/org/apache/ratis/RaftBasicTests.java   | 13 +++---
 .../java/org/apache/ratis/RetryCacheTests.java  | 10 +++--
 19 files changed, 203 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index a01c376..2d7c13e 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -22,6 +22,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
+import org.apache.ratis.util.StringUtils;
 
 import java.util.Arrays;
 
@@ -42,18 +43,19 @@ public class ClientProtoUtils {
   }
 
   public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ByteString requesterId, ByteString replyId, RaftGroupId groupId, long 
callId) {
+      ByteString requesterId, ByteString replyId, RaftGroupId groupId, long 
callId, long seqNum) {
     return RaftRpcRequestProto.newBuilder()
         .setRequestorId(requesterId)
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
-        .setCallId(callId);
+        .setCallId(callId)
+        .setSeqNum(seqNum);
   }
 
   public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long 
callId) {
+      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long 
callId, long seqNum) {
     return toRaftRpcRequestProtoBuilder(
-        requesterId.toByteString(), replyId.toByteString(), groupId, callId);
+        requesterId.toByteString(), replyId.toByteString(), groupId, callId, 
seqNum);
   }
 
   private static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
@@ -62,7 +64,8 @@ public class ClientProtoUtils {
         request.getClientId(),
         request.getServerId(),
         request.getRaftGroupId(),
-        request.getCallId());
+        request.getCallId(),
+        request.getSeqNum());
   }
 
   public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto 
p) {
@@ -72,6 +75,7 @@ public class ClientProtoUtils {
         RaftPeerId.valueOf(request.getReplyId()),
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         request.getCallId(),
+        request.getSeqNum(),
         toMessage(p.getMessage()), p.getReadOnly());
   }
 
@@ -86,10 +90,10 @@ public class ClientProtoUtils {
 
   public static RaftClientRequestProto toRaftClientRequestProto(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
-      ByteString content, boolean readOnly) {
+      long seqNum, ByteString content, boolean readOnly) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
-            clientId, serverId, groupId, callId))
+            clientId, serverId, groupId, callId, seqNum))
         .setMessage(toClientMessageEntryProtoBuilder(content))
         .setReadOnly(readOnly)
         .build();
@@ -204,7 +208,17 @@ public class ClientProtoUtils {
   }
 
   private static Message toMessage(final ClientMessageEntryProto p) {
-    return p::getContent;
+    return new Message() {
+      @Override
+      public ByteString getContent() {
+        return p.getContent();
+      }
+
+      @Override
+      public String toString() {
+        return StringUtils.bytes2HexShortString(getContent());
+      }
+    };
   }
 
   private static ClientMessageEntryProto.Builder 
toClientMessageEntryProtoBuilder(ByteString message) {
@@ -270,4 +284,16 @@ public class ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
         .build();
   }
+
+  public static String toString(RaftClientRequestProto proto) {
+    final RaftRpcRequestProto rpc = proto.getRpcRequest();
+    return ClientId.valueOf(rpc.getRequestorId()) + "->" + 
rpc.getReplyId().toStringUtf8()
+        + "#" + rpc.getCallId() + "-" + rpc.getSeqNum();
+  }
+
+  public static String toString(RaftClientReplyProto proto) {
+    final RaftRpcReplyProto rpc = proto.getRpcReply();
+    return ClientId.valueOf(rpc.getRequestorId()) + "<-" + 
rpc.getReplyId().toStringUtf8()
+        + "#" + rpc.getCallId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 906d55f..9c66a9f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -51,6 +51,7 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
+  private final AtomicLong asyncSeqNum = new AtomicLong();
   private final ScheduledExecutorService scheduler;
   private final Semaphore asyncRequestSemaphore;
 
@@ -68,6 +69,10 @@ final class RaftClientImpl implements RaftClient {
     clientRpc.addServers(peers);
   }
 
+  private long nextSeqNum() {
+    return asyncSeqNum.getAndIncrement() & Long.MAX_VALUE;
+  }
+
   @Override
   public ClientId getId() {
     return clientId;
@@ -93,8 +98,9 @@ final class RaftClientImpl implements RaftClient {
           "Interrupted when sending " + message, e));
     }
     final long callId = nextCallId();
+    final long seqNum = nextSeqNum();
     return sendRequestWithRetryAsync(
-        () -> new RaftClientRequest(clientId, leaderId, groupId, callId, 
message, readOnly)
+        () -> new RaftClientRequest(clientId, leaderId, groupId, callId, 
seqNum, message, readOnly)
     ).thenApply(reply -> {
       if (reply.hasStateMachineException() || 
reply.hasGroupMismatchException()) {
         throw new CompletionException(reply.getException());
@@ -118,7 +124,7 @@ final class RaftClientImpl implements RaftClient {
 
     final long callId = nextCallId();
     return sendRequestWithRetry(() -> new RaftClientRequest(
-        clientId, leaderId, groupId, callId, message, readOnly));
+        clientId, leaderId, groupId, callId, 0L, message, readOnly));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 63e482d..c924ef8 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -22,18 +22,26 @@ package org.apache.ratis.protocol;
  */
 public class RaftClientRequest extends RaftClientMessage {
   private final long callId;
+  private final long seqNum;
+
   private final Message message;
   private final boolean readOnly;
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, Message message) {
-    this(clientId, serverId, groupId, callId, message, false);
+    this(clientId, serverId, groupId, callId, 0L, message, false);
+  }
+
+  public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
+       RaftGroupId groupId, long callId, long seqNum, Message message) {
+    this(clientId, serverId, groupId, callId, seqNum, message, false);
   }
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId, Message message, boolean readOnly) {
+       RaftGroupId groupId, long callId, long seqNum, Message message, boolean 
readOnly) {
     super(clientId, serverId, groupId);
     this.callId = callId;
+    this.seqNum = seqNum;
     this.message = message;
     this.readOnly = readOnly;
   }
@@ -47,6 +55,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return callId;
   }
 
+  public long getSeqNum() {
+    return seqNum;
+  }
+
   public Message getMessage() {
     return message;
   }
@@ -57,7 +69,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", callId: " + callId + ", "
+    return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
         + (isReadOnly()? "RO": "RW") + ", " + getMessage();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 07a8973..0212a48 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
 
 import org.apache.ratis.shaded.com.google.common.collect.Interner;
 import org.apache.ratis.shaded.com.google.common.collect.Interners;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -64,6 +65,21 @@ public class StringUtils {
     return String.format(Locale.ENGLISH, format, objects);
   }
 
+  public static String bytes2HexShortString(ByteString bytes) {
+    final int size = bytes.size();
+    if (size > 10) {
+      // return only the first 10 bytes
+      return bytes2HexString(bytes.substring(0, 10)) + "...(size=" + size + 
")";
+    } else {
+      return bytes2HexString(bytes);
+    }
+  }
+
+  public static String bytes2HexString(ByteString bytes) {
+    Objects.requireNonNull(bytes, "bytes == null");
+    return bytes2HexString(bytes.asReadOnlyByteBuffer());
+  }
+
   public static String bytes2HexString(byte[] bytes) {
     Objects.requireNonNull(bytes, "bytes == null");
     return bytes2HexString(ByteBuffer.wrap(bytes));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index de9fa6a..7cf74ea 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -38,9 +38,15 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 @RunWith(Parameterized.class)
-public class ParameterizedBaseTest extends BaseTest {
+public abstract class ParameterizedBaseTest extends BaseTest {
   public static final Logger LOG = 
LoggerFactory.getLogger(ParameterizedBaseTest.class);
 
+  /** Subclasses should override this method to provide real data parameters. 
*/
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    return Collections.emptyList();
+  }
+
   /** For {@link Parameterized} test so that a cluster can be shared by 
multiple {@link Test} */
   private static final AtomicReference<MiniRaftCluster> currentCluster = new 
AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index 6eaa3ea..158875d 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -100,8 +100,9 @@ public class TestRaftStateMachineException extends 
ParameterizedBaseTest {
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
+    final long seqNum = 111;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, new SimpleMessage("message"));
+        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -141,8 +142,9 @@ public class TestRaftStateMachineException extends 
ParameterizedBaseTest {
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
+    final long seqNum = 111;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, new SimpleMessage("message"));
+        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertTrue(reply.hasStateMachineException());
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index bae3682..d373ddb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -17,20 +17,20 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.shaded.io.grpc.Metadata;
 import org.apache.ratis.shaded.io.grpc.Status;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.util.CheckedSupplier;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.ReflectionUtils;
 import org.apache.ratis.util.StringUtils;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.Function;
 
 public interface RaftGrpcUtil {
@@ -38,6 +38,13 @@ public interface RaftGrpcUtil {
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
 
   static StatusRuntimeException wrapException(Throwable t) {
+    Objects.requireNonNull(t, "t == null");
+    if (t instanceof CompletionException) {
+      if (t.getCause() != null) {
+        t = t.getCause();
+      }
+    }
+
     Metadata trailers = new Metadata();
     trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
     return new StatusRuntimeException(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 16095ef..9c238f4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -156,7 +156,7 @@ public class AppendStreamer implements Closeable {
     if (isRunning()) {
       // wrap the current buffer into a RaftClientRequestProto
       final RaftClientRequestProto request = 
ClientProtoUtils.toRaftClientRequestProto(
-          clientId, leaderId, groupId, seqNum, content, false);
+          clientId, leaderId, groupId, seqNum, seqNum, content, false);
       if (request.getSerializedSize() > maxMessageSize) {
         throw new IOException("msg size:" + request.getSerializedSize() +
             " exceeds maximum:" + maxMessageSize);
@@ -219,6 +219,7 @@ public class AppendStreamer implements Closeable {
             }
           }
           if (running == RunningState.RUNNING) {
+            Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is 
empty");
             RaftClientRequestProto next = dataQueue.poll();
             leaderProxy.onNext(next);
             ackQueue.offer(next);
@@ -261,14 +262,14 @@ public class AppendStreamer implements Closeable {
         return;
       }
       synchronized (AppendStreamer.this) {
-        RaftClientRequestProto pending = Objects.requireNonNull(
-            ackQueue.peek());
+        RaftClientRequestProto pending = 
Objects.requireNonNull(ackQueue.peek());
         if (reply.getRpcReply().getSuccess()) {
-          Preconditions.assertTrue(pending.getRpcRequest().getCallId() ==
-              reply.getRpcReply().getCallId());
+          Preconditions.assertTrue(pending.getRpcRequest().getCallId() == 
reply.getRpcReply().getCallId(),
+              () -> "pending=" + ClientProtoUtils.toString(pending) + " but 
reply=" + ClientProtoUtils.toString(reply));
           ackQueue.poll();
-          LOG.trace("{} received success ack for request {}", this,
-              pending.getRpcRequest());
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("{} received success ack for {}", this, 
ClientProtoUtils.toString(pending));
+          }
           // we've identified the correct leader
           if (running == RunningState.LOOK_FOR_LEADER) {
             running = RunningState.RUNNING;
@@ -288,6 +289,7 @@ public class AppendStreamer implements Closeable {
 
     @Override
     public void onError(Throwable t) {
+      LOG.warn(this + " onError", t);
       if (active) {
         synchronized (AppendStreamer.this) {
           handleError(t, this);
@@ -361,14 +363,11 @@ public class AppendStreamer implements Closeable {
     if (isRunning()) {
       // resend all the pending requests
       while (!ackQueue.isEmpty()) {
-        RaftClientRequestProto oldRequest = ackQueue.pollLast();
-        RaftRpcRequestProto r = oldRequest.getRpcRequest();
-        RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder()
-            .setMessage(oldRequest.getMessage())
-            .setReadOnly(oldRequest.getReadOnly())
-            .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(
-                clientId, newLeader, groupId, r.getCallId()))
-            .build();
+        final RaftClientRequestProto oldRequest = ackQueue.pollLast();
+        final RaftRpcRequestProto.Builder newRpc = 
RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
+            .setReplyId(newLeader.toByteString());
+        final RaftClientRequestProto newRequest = 
RaftClientRequestProto.newBuilder(oldRequest)
+            .setRpcRequest(newRpc).build();
         dataQueue.offerFirst(newRequest);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 5d68d42..f3ebe0f 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -19,10 +19,7 @@ package org.apache.ratis.grpc.client;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
@@ -33,18 +30,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase {
   static final Logger LOG = 
LoggerFactory.getLogger(RaftClientProtocolService.class);
 
   private static class PendingAppend implements Comparable<PendingAppend> {
-    private final long callId;
+    private final RaftClientRequest request;
     private volatile RaftClientReply reply;
 
-    PendingAppend(long callId) {
-      this.callId = callId;
+    PendingAppend(RaftClientRequest request) {
+      this.request = request;
     }
 
     boolean isReady() {
@@ -55,17 +51,25 @@ public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase
       this.reply = reply;
     }
 
+    RaftClientRequest getRequest() {
+      return request;
+    }
+
+    long getSeqNum() {
+      return request != null? request.getSeqNum(): Long.MAX_VALUE;
+    }
+
     @Override
-    public int compareTo(PendingAppend p) {
-      return callId == p.callId ? 0 : (callId < p.callId ? -1 : 1);
+    public int compareTo(PendingAppend that) {
+      return Long.compare(this.getSeqNum(), that.getSeqNum());
     }
 
     @Override
     public String toString() {
-      return callId + ", reply:" + (reply == null ? "null" : reply.toString());
+      return request != null? getSeqNum() + ":" + reply: "COMPLETED";
     }
   }
-  private static final PendingAppend COMPLETED = new 
PendingAppend(Long.MAX_VALUE);
+  private static final PendingAppend COMPLETED = new PendingAppend(null);
 
   private final Supplier<RaftPeerId> idSupplier;
   private final RaftClientAsynchronousProtocol protocol;
@@ -105,31 +109,30 @@ public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase
     @Override
     public void onNext(RaftClientRequestProto request) {
       try {
-        PendingAppend p = new 
PendingAppend(request.getRpcRequest().getCallId());
+        final RaftClientRequest r = 
ClientProtoUtils.toRaftClientRequest(request);
+        final PendingAppend p = new PendingAppend(r);
+        final long replySeq = p.getSeqNum();
         synchronized (pendingList) {
           pendingList.add(p);
         }
 
-        CompletableFuture<RaftClientReply> future = 
protocol.submitClientRequestAsync(
-            ClientProtoUtils.toRaftClientRequest(request));
-        future.whenCompleteAsync((reply, exception) -> {
+        protocol.submitClientRequestAsync(r
+        ).whenCompleteAsync((reply, exception) -> {
           if (exception != null) {
             // TODO: the exception may be from either raft or state machine.
             // Currently we skip all the following responses when getting an
             // exception from the state machine.
             responseObserver.onError(RaftGrpcUtil.wrapException(exception));
           } else {
-            final long replySeq = reply.getCallId();
             synchronized (pendingList) {
               Preconditions.assertTrue(!pendingList.isEmpty(),
-                  "PendingList is empty when handling onNext for callId %s",
-                  replySeq);
-              final long headSeqNum = pendingList.get(0).callId;
-              // we assume the callId is consecutive for a stream RPC call
+                  "PendingList is empty when handling onNext for seqNum %s", 
replySeq);
+              final long headSeqNum = pendingList.get(0).getSeqNum();
+              // stream seqNum is consecutive
               final PendingAppend pendingForReply = pendingList.get(
                   (int) (replySeq - headSeqNum));
               Preconditions.assertTrue(pendingForReply != null &&
-                      pendingForReply.callId == replySeq,
+                      pendingForReply.getSeqNum() == replySeq,
                   "pending for reply is: %s, the pending list: %s",
                   pendingForReply, pendingList);
               pendingForReply.setReply(reply);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index b9df622..de3177c 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -21,14 +21,16 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.grpc.client.AppendStreamer;
 import org.apache.ratis.grpc.client.RaftOutputStream;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.StringUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,6 +41,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -75,7 +78,8 @@ public class TestRaftStream extends BaseTest {
 
   @Test
   public void testSimpleWrite() throws Exception {
-    LOG.info("Running testSimpleWrite");
+    final int numRequests = 500;
+    LOG.info("Running testSimpleWrite, numRequests=" + numRequests);
 
     // default 64K is too large for a test
     GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
@@ -84,20 +88,17 @@ public class TestRaftStream extends BaseTest {
     cluster.start();
     RaftServerImpl leader = waitForLeader(cluster);
 
-    final Random r = new Random();
-    final long seed = r.nextLong();
-    r.setSeed(seed);
     try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
         cluster.getGroup(), leader.getId())) {
-      for (int i = 0; i < 500; i++) { // generate 500 requests
-        out.write(toBytes(r.nextInt()));
+      for (int i = 0; i < numRequests; i++) { // generate requests
+        out.write(toBytes(i));
       }
     }
 
     // check the leader's raft log
     final RaftLog raftLog = leader.getState().getLog();
-    r.setSeed(seed);
-    checkLog(raftLog, 500, () -> toBytes(r.nextInt()));
+    final AtomicInteger i = new AtomicInteger();
+    checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement()));
   }
 
   private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
@@ -107,10 +108,11 @@ public class TestRaftStream extends BaseTest {
     // check the log content
     TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1);
     for (TermIndex entry : entries) {
-      byte[] logData = 
raftLog.get(entry.getIndex()).getSmLogEntry().getData().toByteArray();
+      RaftProtos.LogEntryProto log  = raftLog.get(entry.getIndex());
+      byte[] logData = log.getSmLogEntry().getData().toByteArray();
       byte[] expected = s.get();
-      Assert.assertEquals("log entry: " + entry,
-          expected.length, logData.length);
+      LOG.info("log " + entry + " " + log.getLogEntryBodyCase() + " " + 
StringUtils.bytes2HexString(logData));
+      Assert.assertEquals(expected.length, logData.length);
       Assert.assertArrayEquals(expected, logData);
     }
   }
@@ -262,7 +264,7 @@ public class TestRaftStream extends BaseTest {
     final RaftServerImpl leader = waitForLeader(cluster);
 
     final AtomicBoolean running  = new AtomicBoolean(true);
-    final AtomicBoolean success = new AtomicBoolean(false);
+    final AtomicReference<Boolean> success = new AtomicReference<>();
     final AtomicInteger result = new AtomicInteger(0);
     final CountDownLatch latch = new CountDownLatch(1);
 
@@ -294,6 +296,7 @@ public class TestRaftStream extends BaseTest {
 
     running.set(false);
     latch.await(5, TimeUnit.SECONDS);
+    LOG.info("Writer success? " + success.get());
     Assert.assertTrue(success.get());
     // total number of tx should be >= result + 2, where 2 means two NoOp from
     // leaders. It may be larger than result+2 because the client may resend

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 2657bd1..76a64b3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -56,11 +56,6 @@ public class TestRaftWithGrpc extends RaftBasicTests {
     BlockRequestHandlingInjection.getInstance().unblockAll();
   }
 
-  @Test
-  public void testBasicAppendEntriesAsync() throws Exception {
-    super.testBasicAppendEntries(true);
-  }
-
   @Override
   @Test
   public void testWithLoad() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index a8284fb..fea6bd0 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -76,16 +76,19 @@ message TermIndexProto {
 message RaftRpcRequestProto {
   bytes requestorId = 1;
   bytes replyId = 2;
-  uint64 callId = 3;
-  RaftGroupIdProto raftGroupId = 4;
+  RaftGroupIdProto raftGroupId = 3;
+  uint64 callId = 4;
+
+  uint64 seqNum = 15;
 }
 
 message RaftRpcReplyProto {
   bytes requestorId = 1;
   bytes replyId = 2;
-  uint64 callId = 3;
-  bool success = 4;
-  RaftGroupIdProto raftGroupId = 5;
+  RaftGroupIdProto raftGroupId = 3;
+  uint64 callId = 4;
+
+  bool success = 15;
 }
 
 message FileChunkProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index ab8ad2c..f651230 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -53,7 +53,7 @@ class PendingRequests {
 
     PendingRequest remove(long index) {
       final PendingRequest r = map.remove(index);
-      LOG.debug("{}: PendingRequests.remove{} returns {}", name, index, r);
+      LOG.debug("{}: PendingRequests.remove {} returns {}", name, index, r);
       return r;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index b4db46e..35e3df8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -21,6 +21,7 @@ public interface RaftServerConstants {
   long INVALID_LOG_INDEX = -1;
   byte LOG_TERMINATE_BYTE = 0;
   long DEFAULT_CALLID = 0;
+  long DEFAULT_SEQNUM = 0L;
 
   enum StartupOption {
     FORMAT("format"),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index d660f76..9f84e05 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 import java.util.Arrays;
 import java.util.List;
@@ -131,7 +132,7 @@ public class ServerProtoUtils {
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
     return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
-        requestorId.toByteString(), replyId.toByteString(), groupId, 
DEFAULT_CALLID);
+        requestorId.toByteString(), replyId.toByteString(), groupId, 
DEFAULT_CALLID, DEFAULT_SEQNUM);
   }
 
   public static RequestVoteRequestProto toRequestVoteRequestProto(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index a6d4bdd..88fddf4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -46,6 +46,7 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 public abstract class MiniRaftCluster {
   public static final Logger LOG = 
LoggerFactory.getLogger(MiniRaftCluster.class);
@@ -325,6 +326,7 @@ public abstract class MiniRaftCluster {
   }
 
   public void killServer(RaftPeerId id) {
+    LOG.info("killServer " + id);
     servers.get(id).close();
   }
 
@@ -484,7 +486,7 @@ public abstract class MiniRaftCluster {
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, Message message) {
     return new RaftClientRequest(clientId, leaderId, getGroupId(),
-        DEFAULT_CALLID, message);
+        DEFAULT_CALLID, DEFAULT_SEQNUM, message);
   }
 
   public SetConfigurationRequest newSetConfigurationRequest(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 78c95c7..b8bc636 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -55,24 +55,32 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   }
 
   @Test
-  public void testAsyncConfiguration(){
+  public void testAsyncConfiguration() throws IOException {
     LOG.info("Running testAsyncConfiguration");
     RaftClient.Builder clientBuilder = RaftClient.newBuilder()
         .setRaftGroup(RaftGroup.emptyGroup())
         .setProperties(properties);
-    RaftClient client = clientBuilder.build();
     int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT;
     int maxOutstandingRequests = 
RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT;
-    RaftClientTestUtil.assertScheduler(client, numThreads);
-    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
+    try(RaftClient client = clientBuilder.build()) {
+      RaftClientTestUtil.assertScheduler(client, numThreads);
+      RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
+    }
 
     numThreads = 200;
     maxOutstandingRequests = 5;
     RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 
maxOutstandingRequests);
     RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads);
-    client = clientBuilder.build();
-    RaftClientTestUtil.assertScheduler(client, numThreads);
-    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
+    try(RaftClient client = clientBuilder.build()) {
+      RaftClientTestUtil.assertScheduler(client, numThreads);
+      RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
+    }
+
+    // reset to default for other tests.
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
+        RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT);
+    RaftClientConfigKeys.Async.setSchedulerThreads(properties,
+        RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT);
   }
 
   @Test
@@ -128,4 +136,14 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     Assert.assertTrue(blockedRequestsCount.get() == 0);
     cluster.shutdown();
   }
+
+  @Test
+  public void testBasicAppendEntriesAsync() throws Exception {
+    LOG.info("Running testBasicAppendEntriesAsync");
+    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    cluster.start();
+    waitForLeader(cluster);
+    RaftBasicTests.runTestBasicAppendEntries(true, 10, cluster, LOG);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 89c40d0..85b165e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -38,6 +38,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.apache.ratis.server.storage.RaftLog;
+import org.slf4j.Logger;
 
 
 import static org.apache.ratis.RaftTestUtil.*;
@@ -113,19 +114,19 @@ public abstract class RaftBasicTests extends BaseTest {
 
   @Test
   public void testBasicAppendEntries() throws Exception {
-    testBasicAppendEntries(false);
+    runTestBasicAppendEntries(false, 10, getCluster(), LOG);
   }
 
-  protected void testBasicAppendEntries(boolean async) throws Exception {
-    LOG.info("Running testBasicAppendEntries");
-    final MiniRaftCluster cluster = getCluster();
+  static void runTestBasicAppendEntries(
+      boolean async, int numMessages, MiniRaftCluster cluster, Logger LOG) 
throws Exception {
+    LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + 
numMessages);
     RaftServerImpl leader = waitForLeader(cluster);
     final long term = leader.getState().getCurrentTerm();
-    final RaftPeerId killed = cluster.getFollowers().get(3).getId();
+    final RaftPeerId killed = cluster.getFollowers().get(0).getId();
     cluster.killServer(killed);
     LOG.info(cluster.printServers());
 
-    final SimpleMessage[] messages = SimpleMessage.create(10);
+    final SimpleMessage[] messages = SimpleMessage.create(numMessages);
 
     try (final RaftClient client = cluster.createClient()) {
       final AtomicInteger asyncReplyCount = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 91aa58a..e02f999 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -70,15 +70,15 @@ public abstract class RetryCacheTests extends BaseTest {
     final MiniRaftCluster cluster = getCluster();
     RaftTestUtil.waitForLeader(cluster);
 
-
     final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId();
     long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
 
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
+    final long seqNum = 111;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -117,11 +117,13 @@ public abstract class RetryCacheTests extends BaseTest {
     RaftTestUtil.waitForLeader(cluster);
 
     final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId();
+
     final RaftClient client = cluster.createClient(leaderId);
     RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
+    final long seqNum = 111;
     RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, new 
RaftTestUtil.SimpleMessage("message"));
+        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -139,7 +141,7 @@ public abstract class RetryCacheTests extends BaseTest {
     Assert.assertNotEquals(leaderId, newLeaderId);
     // same clientId and callId in the request
     r = new RaftClientRequest(client.getId(), newLeaderId, 
cluster.getGroupId(),
-        callId, new RaftTestUtil.SimpleMessage("message"));
+        callId, seqNum, new SimpleMessage("message"));
     for (int i = 0; i < 10; i++) {
       try {
         reply = rpc.sendRequest(r);


Reply via email to