Repository: incubator-ratis Updated Branches: refs/heads/master 5de0fa647 -> 1f07109d7
RATIS-141. In RaftClientProtocolService, the assumption of consecutive callId is invalid. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1f07109d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1f07109d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1f07109d Branch: refs/heads/master Commit: 1f07109d71c683455af484e16d3966b18d415c45 Parents: 5de0fa6 Author: Chen Liang <[email protected]> Authored: Tue Nov 28 15:17:32 2017 -0800 Committer: Chen Liang <[email protected]> Committed: Tue Nov 28 15:17:32 2017 -0800 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 42 +++++++++++++---- .../ratis/client/impl/RaftClientImpl.java | 10 ++++- .../ratis/protocol/RaftClientRequest.java | 18 ++++++-- .../java/org/apache/ratis/util/StringUtils.java | 16 +++++++ .../ratis/examples/ParameterizedBaseTest.java | 8 +++- .../TestRaftStateMachineException.java | 6 ++- .../org/apache/ratis/grpc/RaftGrpcUtil.java | 11 ++++- .../ratis/grpc/client/AppendStreamer.java | 29 ++++++------ .../grpc/client/RaftClientProtocolService.java | 47 +++++++++++--------- .../org/apache/ratis/grpc/TestRaftStream.java | 33 +++++++------- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 5 --- ratis-proto-shaded/src/main/proto/Raft.proto | 13 +++--- .../ratis/server/impl/PendingRequests.java | 2 +- .../ratis/server/impl/RaftServerConstants.java | 1 + .../ratis/server/impl/ServerProtoUtils.java | 3 +- .../java/org/apache/ratis/MiniRaftCluster.java | 4 +- .../java/org/apache/ratis/RaftAsyncTests.java | 32 ++++++++++--- .../java/org/apache/ratis/RaftBasicTests.java | 13 +++--- .../java/org/apache/ratis/RetryCacheTests.java | 10 +++-- 19 files changed, 203 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index a01c376..2d7c13e 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -22,6 +22,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReflectionUtils; +import org.apache.ratis.util.StringUtils; import java.util.Arrays; @@ -42,18 +43,19 @@ public class ClientProtoUtils { } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId) { + ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId, long seqNum) { return RaftRpcRequestProto.newBuilder() .setRequestorId(requesterId) .setReplyId(replyId) .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId)) - .setCallId(callId); + .setCallId(callId) + .setSeqNum(seqNum); } public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( - ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId) { + ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId, long seqNum) { return toRaftRpcRequestProtoBuilder( - requesterId.toByteString(), replyId.toByteString(), groupId, callId); + requesterId.toByteString(), replyId.toByteString(), groupId, callId, seqNum); } private static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( @@ -62,7 +64,8 @@ public class ClientProtoUtils { request.getClientId(), request.getServerId(), request.getRaftGroupId(), - request.getCallId()); + request.getCallId(), + request.getSeqNum()); } public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { @@ -72,6 +75,7 @@ public class ClientProtoUtils { RaftPeerId.valueOf(request.getReplyId()), ProtoUtils.toRaftGroupId(request.getRaftGroupId()), request.getCallId(), + request.getSeqNum(), toMessage(p.getMessage()), p.getReadOnly()); } @@ -86,10 +90,10 @@ public class ClientProtoUtils { public static RaftClientRequestProto toRaftClientRequestProto( ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, - ByteString content, boolean readOnly) { + long seqNum, ByteString content, boolean readOnly) { return RaftClientRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder( - clientId, serverId, groupId, callId)) + clientId, serverId, groupId, callId, seqNum)) .setMessage(toClientMessageEntryProtoBuilder(content)) .setReadOnly(readOnly) .build(); @@ -204,7 +208,17 @@ public class ClientProtoUtils { } private static Message toMessage(final ClientMessageEntryProto p) { - return p::getContent; + return new Message() { + @Override + public ByteString getContent() { + return p.getContent(); + } + + @Override + public String toString() { + return StringUtils.bytes2HexShortString(getContent()); + } + }; } private static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(ByteString message) { @@ -270,4 +284,16 @@ public class ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) .build(); } + + public static String toString(RaftClientRequestProto proto) { + final RaftRpcRequestProto rpc = proto.getRpcRequest(); + return ClientId.valueOf(rpc.getRequestorId()) + "->" + rpc.getReplyId().toStringUtf8() + + "#" + rpc.getCallId() + "-" + rpc.getSeqNum(); + } + + public static String toString(RaftClientReplyProto proto) { + final RaftRpcReplyProto rpc = proto.getRpcReply(); + return ClientId.valueOf(rpc.getRequestorId()) + "<-" + rpc.getReplyId().toStringUtf8() + + "#" + rpc.getCallId(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 906d55f..9c66a9f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -51,6 +51,7 @@ final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; + private final AtomicLong asyncSeqNum = new AtomicLong(); private final ScheduledExecutorService scheduler; private final Semaphore asyncRequestSemaphore; @@ -68,6 +69,10 @@ final class RaftClientImpl implements RaftClient { clientRpc.addServers(peers); } + private long nextSeqNum() { + return asyncSeqNum.getAndIncrement() & Long.MAX_VALUE; + } + @Override public ClientId getId() { return clientId; @@ -93,8 +98,9 @@ final class RaftClientImpl implements RaftClient { "Interrupted when sending " + message, e)); } final long callId = nextCallId(); + final long seqNum = nextSeqNum(); return sendRequestWithRetryAsync( - () -> new RaftClientRequest(clientId, leaderId, groupId, callId, message, readOnly) + () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly) ).thenApply(reply -> { if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) { throw new CompletionException(reply.getException()); @@ -118,7 +124,7 @@ final class RaftClientImpl implements RaftClient { final long callId = nextCallId(); return sendRequestWithRetry(() -> new RaftClientRequest( - clientId, leaderId, groupId, callId, message, readOnly)); + clientId, leaderId, groupId, callId, 0L, message, readOnly)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 63e482d..c924ef8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -22,18 +22,26 @@ package org.apache.ratis.protocol; */ public class RaftClientRequest extends RaftClientMessage { private final long callId; + private final long seqNum; + private final Message message; private final boolean readOnly; public RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Message message) { - this(clientId, serverId, groupId, callId, message, false); + this(clientId, serverId, groupId, callId, 0L, message, false); + } + + public RaftClientRequest(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId, long callId, long seqNum, Message message) { + this(clientId, serverId, groupId, callId, seqNum, message, false); } public RaftClientRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId, Message message, boolean readOnly) { + RaftGroupId groupId, long callId, long seqNum, Message message, boolean readOnly) { super(clientId, serverId, groupId); this.callId = callId; + this.seqNum = seqNum; this.message = message; this.readOnly = readOnly; } @@ -47,6 +55,10 @@ public class RaftClientRequest extends RaftClientMessage { return callId; } + public long getSeqNum() { + return seqNum; + } + public Message getMessage() { return message; } @@ -57,7 +69,7 @@ public class RaftClientRequest extends RaftClientMessage { @Override public String toString() { - return super.toString() + ", callId: " + callId + ", " + return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " " + (isReadOnly()? "RO": "RW") + ", " + getMessage(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index 07a8973..0212a48 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -19,6 +19,7 @@ package org.apache.ratis.util; import org.apache.ratis.shaded.com.google.common.collect.Interner; import org.apache.ratis.shaded.com.google.common.collect.Interners; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; import java.io.PrintWriter; import java.io.StringWriter; @@ -64,6 +65,21 @@ public class StringUtils { return String.format(Locale.ENGLISH, format, objects); } + public static String bytes2HexShortString(ByteString bytes) { + final int size = bytes.size(); + if (size > 10) { + // return only the first 10 bytes + return bytes2HexString(bytes.substring(0, 10)) + "...(size=" + size + ")"; + } else { + return bytes2HexString(bytes); + } + } + + public static String bytes2HexString(ByteString bytes) { + Objects.requireNonNull(bytes, "bytes == null"); + return bytes2HexString(bytes.asReadOnlyByteBuffer()); + } + public static String bytes2HexString(byte[] bytes) { Objects.requireNonNull(bytes, "bytes == null"); return bytes2HexString(ByteBuffer.wrap(bytes)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java index de9fa6a..7cf74ea 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java @@ -38,9 +38,15 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReference; @RunWith(Parameterized.class) -public class ParameterizedBaseTest extends BaseTest { +public abstract class ParameterizedBaseTest extends BaseTest { public static final Logger LOG = LoggerFactory.getLogger(ParameterizedBaseTest.class); + /** Subclasses should override this method to provide real data parameters. */ + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + return Collections.emptyList(); + } + /** For {@link Parameterized} test so that a cluster can be shared by multiple {@link Test} */ private static final AtomicReference<MiniRaftCluster> currentCluster = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index 6eaa3ea..158875d 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -100,8 +100,9 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest { final RaftClient client = cluster.createClient(leaderId); final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; + final long seqNum = 111; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - cluster.getGroupId(), callId, new SimpleMessage("message")); + cluster.getGroupId(), callId, seqNum, new SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertFalse(reply.isSuccess()); Assert.assertNotNull(reply.getStateMachineException()); @@ -141,8 +142,9 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest { final RaftClient client = cluster.createClient(leaderId); final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; + final long seqNum = 111; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - cluster.getGroupId(), callId, new SimpleMessage("message")); + cluster.getGroupId(), callId, seqNum, new SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertTrue(reply.hasStateMachineException()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java index bae3682..d373ddb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -17,20 +17,20 @@ */ package org.apache.ratis.grpc; -import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.shaded.io.grpc.Metadata; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.util.CheckedSupplier; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.ReflectionUtils; import org.apache.ratis.util.StringUtils; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Function; public interface RaftGrpcUtil { @@ -38,6 +38,13 @@ public interface RaftGrpcUtil { Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); static StatusRuntimeException wrapException(Throwable t) { + Objects.requireNonNull(t, "t == null"); + if (t instanceof CompletionException) { + if (t.getCause() != null) { + t = t.getCause(); + } + } + Metadata trailers = new Metadata(); trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); return new StatusRuntimeException( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 16095ef..9c238f4 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -156,7 +156,7 @@ public class AppendStreamer implements Closeable { if (isRunning()) { // wrap the current buffer into a RaftClientRequestProto final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto( - clientId, leaderId, groupId, seqNum, content, false); + clientId, leaderId, groupId, seqNum, seqNum, content, false); if (request.getSerializedSize() > maxMessageSize) { throw new IOException("msg size:" + request.getSerializedSize() + " exceeds maximum:" + maxMessageSize); @@ -219,6 +219,7 @@ public class AppendStreamer implements Closeable { } } if (running == RunningState.RUNNING) { + Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty"); RaftClientRequestProto next = dataQueue.poll(); leaderProxy.onNext(next); ackQueue.offer(next); @@ -261,14 +262,14 @@ public class AppendStreamer implements Closeable { return; } synchronized (AppendStreamer.this) { - RaftClientRequestProto pending = Objects.requireNonNull( - ackQueue.peek()); + RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek()); if (reply.getRpcReply().getSuccess()) { - Preconditions.assertTrue(pending.getRpcRequest().getCallId() == - reply.getRpcReply().getCallId()); + Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(), + () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply)); ackQueue.poll(); - LOG.trace("{} received success ack for request {}", this, - pending.getRpcRequest()); + if (LOG.isTraceEnabled()) { + LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending)); + } // we've identified the correct leader if (running == RunningState.LOOK_FOR_LEADER) { running = RunningState.RUNNING; @@ -288,6 +289,7 @@ public class AppendStreamer implements Closeable { @Override public void onError(Throwable t) { + LOG.warn(this + " onError", t); if (active) { synchronized (AppendStreamer.this) { handleError(t, this); @@ -361,14 +363,11 @@ public class AppendStreamer implements Closeable { if (isRunning()) { // resend all the pending requests while (!ackQueue.isEmpty()) { - RaftClientRequestProto oldRequest = ackQueue.pollLast(); - RaftRpcRequestProto r = oldRequest.getRpcRequest(); - RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder() - .setMessage(oldRequest.getMessage()) - .setReadOnly(oldRequest.getReadOnly()) - .setRpcRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder( - clientId, newLeader, groupId, r.getCallId())) - .build(); + final RaftClientRequestProto oldRequest = ackQueue.pollLast(); + final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest()) + .setReplyId(newLeader.toByteString()); + final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest) + .setRpcRequest(newRpc).build(); dataQueue.offerFirst(newRequest); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index 5d68d42..f3ebe0f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -19,10 +19,7 @@ package org.apache.ratis.grpc.client; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; @@ -33,18 +30,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); private static class PendingAppend implements Comparable<PendingAppend> { - private final long callId; + private final RaftClientRequest request; private volatile RaftClientReply reply; - PendingAppend(long callId) { - this.callId = callId; + PendingAppend(RaftClientRequest request) { + this.request = request; } boolean isReady() { @@ -55,17 +51,25 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase this.reply = reply; } + RaftClientRequest getRequest() { + return request; + } + + long getSeqNum() { + return request != null? request.getSeqNum(): Long.MAX_VALUE; + } + @Override - public int compareTo(PendingAppend p) { - return callId == p.callId ? 0 : (callId < p.callId ? -1 : 1); + public int compareTo(PendingAppend that) { + return Long.compare(this.getSeqNum(), that.getSeqNum()); } @Override public String toString() { - return callId + ", reply:" + (reply == null ? "null" : reply.toString()); + return request != null? getSeqNum() + ":" + reply: "COMPLETED"; } } - private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); + private static final PendingAppend COMPLETED = new PendingAppend(null); private final Supplier<RaftPeerId> idSupplier; private final RaftClientAsynchronousProtocol protocol; @@ -105,31 +109,30 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase @Override public void onNext(RaftClientRequestProto request) { try { - PendingAppend p = new PendingAppend(request.getRpcRequest().getCallId()); + final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request); + final PendingAppend p = new PendingAppend(r); + final long replySeq = p.getSeqNum(); synchronized (pendingList) { pendingList.add(p); } - CompletableFuture<RaftClientReply> future = protocol.submitClientRequestAsync( - ClientProtoUtils.toRaftClientRequest(request)); - future.whenCompleteAsync((reply, exception) -> { + protocol.submitClientRequestAsync(r + ).whenCompleteAsync((reply, exception) -> { if (exception != null) { // TODO: the exception may be from either raft or state machine. // Currently we skip all the following responses when getting an // exception from the state machine. responseObserver.onError(RaftGrpcUtil.wrapException(exception)); } else { - final long replySeq = reply.getCallId(); synchronized (pendingList) { Preconditions.assertTrue(!pendingList.isEmpty(), - "PendingList is empty when handling onNext for callId %s", - replySeq); - final long headSeqNum = pendingList.get(0).callId; - // we assume the callId is consecutive for a stream RPC call + "PendingList is empty when handling onNext for seqNum %s", replySeq); + final long headSeqNum = pendingList.get(0).getSeqNum(); + // stream seqNum is consecutive final PendingAppend pendingForReply = pendingList.get( (int) (replySeq - headSeqNum)); Preconditions.assertTrue(pendingForReply != null && - pendingForReply.callId == replySeq, + pendingForReply.getSeqNum() == replySeq, "pending for reply is: %s, the pending list: %s", pendingForReply, pendingList); pendingForReply.setReply(reply); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index b9df622..de3177c 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -21,14 +21,16 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.grpc.client.AppendStreamer; import org.apache.ratis.grpc.client.RaftOutputStream; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.StringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -39,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.apache.ratis.RaftTestUtil.waitForLeader; @@ -75,7 +78,8 @@ public class TestRaftStream extends BaseTest { @Test public void testSimpleWrite() throws Exception { - LOG.info("Running testSimpleWrite"); + final int numRequests = 500; + LOG.info("Running testSimpleWrite, numRequests=" + numRequests); // default 64K is too large for a test GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4)); @@ -84,20 +88,17 @@ public class TestRaftStream extends BaseTest { cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - final Random r = new Random(); - final long seed = r.nextLong(); - r.setSeed(seed); try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(), cluster.getGroup(), leader.getId())) { - for (int i = 0; i < 500; i++) { // generate 500 requests - out.write(toBytes(r.nextInt())); + for (int i = 0; i < numRequests; i++) { // generate requests + out.write(toBytes(i)); } } // check the leader's raft log final RaftLog raftLog = leader.getState().getLog(); - r.setSeed(seed); - checkLog(raftLog, 500, () -> toBytes(r.nextInt())); + final AtomicInteger i = new AtomicInteger(); + checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement())); } private void checkLog(RaftLog raftLog, long expectedCommittedIndex, @@ -107,10 +108,11 @@ public class TestRaftStream extends BaseTest { // check the log content TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); for (TermIndex entry : entries) { - byte[] logData = raftLog.get(entry.getIndex()).getSmLogEntry().getData().toByteArray(); + RaftProtos.LogEntryProto log = raftLog.get(entry.getIndex()); + byte[] logData = log.getSmLogEntry().getData().toByteArray(); byte[] expected = s.get(); - Assert.assertEquals("log entry: " + entry, - expected.length, logData.length); + LOG.info("log " + entry + " " + log.getLogEntryBodyCase() + " " + StringUtils.bytes2HexString(logData)); + Assert.assertEquals(expected.length, logData.length); Assert.assertArrayEquals(expected, logData); } } @@ -262,7 +264,7 @@ public class TestRaftStream extends BaseTest { final RaftServerImpl leader = waitForLeader(cluster); final AtomicBoolean running = new AtomicBoolean(true); - final AtomicBoolean success = new AtomicBoolean(false); + final AtomicReference<Boolean> success = new AtomicReference<>(); final AtomicInteger result = new AtomicInteger(0); final CountDownLatch latch = new CountDownLatch(1); @@ -294,6 +296,7 @@ public class TestRaftStream extends BaseTest { running.set(false); latch.await(5, TimeUnit.SECONDS); + LOG.info("Writer success? " + success.get()); Assert.assertTrue(success.get()); // total number of tx should be >= result + 2, where 2 means two NoOp from // leaders. It may be larger than result+2 because the client may resend http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 2657bd1..76a64b3 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -56,11 +56,6 @@ public class TestRaftWithGrpc extends RaftBasicTests { BlockRequestHandlingInjection.getInstance().unblockAll(); } - @Test - public void testBasicAppendEntriesAsync() throws Exception { - super.testBasicAppendEntries(true); - } - @Override @Test public void testWithLoad() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index a8284fb..fea6bd0 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -76,16 +76,19 @@ message TermIndexProto { message RaftRpcRequestProto { bytes requestorId = 1; bytes replyId = 2; - uint64 callId = 3; - RaftGroupIdProto raftGroupId = 4; + RaftGroupIdProto raftGroupId = 3; + uint64 callId = 4; + + uint64 seqNum = 15; } message RaftRpcReplyProto { bytes requestorId = 1; bytes replyId = 2; - uint64 callId = 3; - bool success = 4; - RaftGroupIdProto raftGroupId = 5; + RaftGroupIdProto raftGroupId = 3; + uint64 callId = 4; + + bool success = 15; } message FileChunkProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index ab8ad2c..f651230 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -53,7 +53,7 @@ class PendingRequests { PendingRequest remove(long index) { final PendingRequest r = map.remove(index); - LOG.debug("{}: PendingRequests.remove{} returns {}", name, index, r); + LOG.debug("{}: PendingRequests.remove {} returns {}", name, index, r); return r; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java index b4db46e..35e3df8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java @@ -21,6 +21,7 @@ public interface RaftServerConstants { long INVALID_LOG_INDEX = -1; byte LOG_TERMINATE_BYTE = 0; long DEFAULT_CALLID = 0; + long DEFAULT_SEQNUM = 0L; enum StartupOption { FORMAT("format"), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index d660f76..9f84e05 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.impl; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; import java.util.Arrays; import java.util.List; @@ -131,7 +132,7 @@ public class ServerProtoUtils { static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) { return ClientProtoUtils.toRaftRpcRequestProtoBuilder( - requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID); + requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, DEFAULT_SEQNUM); } public static RequestVoteRequestProto toRequestVoteRequestProto( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index a6d4bdd..88fddf4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -46,6 +46,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); @@ -325,6 +326,7 @@ public abstract class MiniRaftCluster { } public void killServer(RaftPeerId id) { + LOG.info("killServer " + id); servers.get(id).close(); } @@ -484,7 +486,7 @@ public abstract class MiniRaftCluster { public RaftClientRequest newRaftClientRequest( ClientId clientId, RaftPeerId leaderId, Message message) { return new RaftClientRequest(clientId, leaderId, getGroupId(), - DEFAULT_CALLID, message); + DEFAULT_CALLID, DEFAULT_SEQNUM, message); } public SetConfigurationRequest newSetConfigurationRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 78c95c7..b8bc636 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -55,24 +55,32 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba } @Test - public void testAsyncConfiguration(){ + public void testAsyncConfiguration() throws IOException { LOG.info("Running testAsyncConfiguration"); RaftClient.Builder clientBuilder = RaftClient.newBuilder() .setRaftGroup(RaftGroup.emptyGroup()) .setProperties(properties); - RaftClient client = clientBuilder.build(); int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT; int maxOutstandingRequests = RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT; - RaftClientTestUtil.assertScheduler(client, numThreads); - RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); + try(RaftClient client = clientBuilder.build()) { + RaftClientTestUtil.assertScheduler(client, numThreads); + RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); + } numThreads = 200; maxOutstandingRequests = 5; RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, maxOutstandingRequests); RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads); - client = clientBuilder.build(); - RaftClientTestUtil.assertScheduler(client, numThreads); - RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); + try(RaftClient client = clientBuilder.build()) { + RaftClientTestUtil.assertScheduler(client, numThreads); + RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0); + } + + // reset to default for other tests. + RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, + RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT); + RaftClientConfigKeys.Async.setSchedulerThreads(properties, + RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT); } @Test @@ -128,4 +136,14 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba Assert.assertTrue(blockedRequestsCount.get() == 0); cluster.shutdown(); } + + @Test + public void testBasicAppendEntriesAsync() throws Exception { + LOG.info("Running testBasicAppendEntriesAsync"); + final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties); + cluster.start(); + waitForLeader(cluster); + RaftBasicTests.runTestBasicAppendEntries(true, 10, cluster, LOG); + cluster.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 89c40d0..85b165e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -38,6 +38,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.apache.ratis.server.storage.RaftLog; +import org.slf4j.Logger; import static org.apache.ratis.RaftTestUtil.*; @@ -113,19 +114,19 @@ public abstract class RaftBasicTests extends BaseTest { @Test public void testBasicAppendEntries() throws Exception { - testBasicAppendEntries(false); + runTestBasicAppendEntries(false, 10, getCluster(), LOG); } - protected void testBasicAppendEntries(boolean async) throws Exception { - LOG.info("Running testBasicAppendEntries"); - final MiniRaftCluster cluster = getCluster(); + static void runTestBasicAppendEntries( + boolean async, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception { + LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + numMessages); RaftServerImpl leader = waitForLeader(cluster); final long term = leader.getState().getCurrentTerm(); - final RaftPeerId killed = cluster.getFollowers().get(3).getId(); + final RaftPeerId killed = cluster.getFollowers().get(0).getId(); cluster.killServer(killed); LOG.info(cluster.printServers()); - final SimpleMessage[] messages = SimpleMessage.create(10); + final SimpleMessage[] messages = SimpleMessage.create(numMessages); try (final RaftClient client = cluster.createClient()) { final AtomicInteger asyncReplyCount = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1f07109d/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index 91aa58a..e02f999 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -70,15 +70,15 @@ public abstract class RetryCacheTests extends BaseTest { final MiniRaftCluster cluster = getCluster(); RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); final RaftClient client = cluster.createClient(leaderId); final RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; + final long seqNum = 111; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, seqNum, new SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertEquals(callId, reply.getCallId()); Assert.assertTrue(reply.isSuccess()); @@ -117,11 +117,13 @@ public abstract class RetryCacheTests extends BaseTest { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); + final RaftClient client = cluster.createClient(leaderId); RaftClientRpc rpc = client.getClientRpc(); final long callId = 999; + final long seqNum = 111; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, - cluster.getGroupId(), callId, new RaftTestUtil.SimpleMessage("message")); + cluster.getGroupId(), callId, seqNum, new SimpleMessage("message")); RaftClientReply reply = rpc.sendRequest(r); Assert.assertEquals(callId, reply.getCallId()); Assert.assertTrue(reply.isSuccess()); @@ -139,7 +141,7 @@ public abstract class RetryCacheTests extends BaseTest { Assert.assertNotEquals(leaderId, newLeaderId); // same clientId and callId in the request r = new RaftClientRequest(client.getId(), newLeaderId, cluster.getGroupId(), - callId, new RaftTestUtil.SimpleMessage("message")); + callId, seqNum, new SimpleMessage("message")); for (int i = 0; i < 10; i++) { try { reply = rpc.sendRequest(r);
