Repository: incubator-ratis Updated Branches: refs/heads/master a1edeabea -> 959d493c0
RATIS-442. In RaftClient, remove ReplicationLevel parameter from send(..) and sendAsync(..) methods. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/959d493c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/959d493c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/959d493c Branch: refs/heads/master Commit: 959d493c0eacce6712bffe39924bd0440d11a6b0 Parents: a1edeab Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Nov 30 14:17:45 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Nov 30 14:17:45 2018 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 18 +--- .../ratis/client/impl/RaftClientImpl.java | 10 +- .../ratis/protocol/RaftClientRequest.java | 17 +--- .../hadooprpc/TestRetryCacheWithHadoopRpc.java | 29 +----- ratis-proto/src/main/proto/Raft.proto | 1 - .../apache/ratis/server/impl/LeaderState.java | 12 +-- .../ratis/server/impl/PendingRequest.java | 44 +-------- .../ratis/server/impl/PendingRequests.java | 79 +--------------- .../ratis/server/impl/RaftServerImpl.java | 9 +- .../java/org/apache/ratis/MiniRaftCluster.java | 13 +-- .../java/org/apache/ratis/RaftAsyncTests.java | 25 +---- .../java/org/apache/ratis/RaftBasicTests.java | 82 +++------------- .../java/org/apache/ratis/RetryCacheTests.java | 51 ++++------ .../ratis/grpc/TestRetryCacheWithGrpc.java | 98 +------------------- .../ratis/netty/TestRetryCacheWithNettyRpc.java | 28 +----- .../TestRetryCacheWithSimulatedRpc.java | 28 +----- 16 files changed, 74 insertions(+), 470 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index deb91f8..ec72d8c 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -50,15 +50,9 @@ public interface RaftClient extends Closeable { * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead. * * @param message The request message. - * @param replication The replication level required. * @return a future of the reply. */ - CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication); - - /** The same as sendAsync(message, MAJORITY). */ - default CompletableFuture<RaftClientReply> sendAsync(Message message) { - return sendAsync(message, ReplicationLevel.MAJORITY); - } + CompletableFuture<RaftClientReply> sendAsync(Message message); /** Async call to send the given readonly message to the raft service. */ CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message); @@ -75,15 +69,9 @@ public interface RaftClient extends Closeable { * For readonly messages, use {@link #sendReadOnly(Message)} instead. * * @param message The request message. - * @param replication The replication level required. * @return the reply. */ - RaftClientReply send(Message message, ReplicationLevel replication) throws IOException; - - /** The same as send(message, MAJORITY). */ - default RaftClientReply send(Message message) throws IOException { - return send(message, ReplicationLevel.MAJORITY); - } + RaftClientReply send(Message message) throws IOException; /** Send the given readonly message to the raft service. */ RaftClientReply sendReadOnly(Message message) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 22c958f..4c73d45 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -137,8 +137,8 @@ final class RaftClientImpl implements RaftClient { } @Override - public CompletableFuture<RaftClientReply> sendAsync(Message message, ReplicationLevel replication) { - return sendAsync(RaftClientRequest.writeRequestType(replication), message, null); + public CompletableFuture<RaftClientReply> sendAsync(Message message) { + return sendAsync(RaftClientRequest.writeRequestType(), message, null); } @Override @@ -183,8 +183,8 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientReply send(Message message, ReplicationLevel replication) throws IOException { - return send(RaftClientRequest.writeRequestType(replication), message, null); + public RaftClientReply send(Message message) throws IOException { + return send(RaftClientRequest.writeRequestType(), message, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 48d203a..08397c3 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,19 +29,12 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase. */ public class RaftClientRequest extends RaftClientMessage { private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance()); - private static final Type WRITE_ALL = new Type( - WriteRequestTypeProto.newBuilder().setReplication(ReplicationLevel.ALL).build()); private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance()); private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance()); - public static Type writeRequestType(ReplicationLevel replication) { - switch (replication) { - case MAJORITY: return WRITE_DEFAULT; - case ALL: return WRITE_ALL; - default: - throw new IllegalArgumentException("Unexpected replication: " + replication); - } + public static Type writeRequestType() { + return WRITE_DEFAULT; } public static Type readRequestType() { @@ -60,7 +53,7 @@ public class RaftClientRequest extends RaftClientMessage { /** The type of a request (oneof write, read, staleRead, watch; see the message RaftClientRequestProto). */ public static class Type { public static Type valueOf(WriteRequestTypeProto write) { - return writeRequestType(write.getReplication()); + return WRITE_DEFAULT; } public static Type valueOf(ReadRequestTypeProto read) { @@ -145,7 +138,7 @@ public class RaftClientRequest extends RaftClientMessage { public String toString() { switch (typeCase) { case WRITE: - return "RW" + toString(getWrite().getReplication()); + return "RW"; case READ: return "RO"; case STALEREAD: http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java index 8c0dc3a..96caf16 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,30 +17,9 @@ */ package org.apache.ratis.hadooprpc; -import org.apache.log4j.Level; import org.apache.ratis.RetryCacheTests; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.LogUtils; -import java.io.IOException; - -public class TestRetryCacheWithHadoopRpc extends RetryCacheTests { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithHadoopRpc cluster; - - public TestRetryCacheWithHadoopRpc() throws IOException { - cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster( - NUM_SERVERS, getProperties()); - } - - @Override - public MiniRaftClusterWithHadoopRpc getCluster() { - return cluster; - } +public class TestRetryCacheWithHadoopRpc + extends RetryCacheTests<MiniRaftClusterWithHadoopRpc> + implements MiniRaftClusterWithHadoopRpc.Factory.Get { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 43c355b..83d4394 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -212,7 +212,6 @@ enum RaftPeerRole { } message WriteRequestTypeProto { - ReplicationLevel replication = 1; } message ReadRequestTypeProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index c8d96b5..032c3a9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -582,7 +582,6 @@ public class LeaderState { } watchRequests.update(ReplicationLevel.ALL, min); - pendingRequests.checkDelayedReplies(min); } private void logMetadata(long commitIndex) { @@ -681,13 +680,8 @@ public class LeaderState { return lists; } - /** @return true if the request is replied; otherwise, the reply is delayed, return false. */ - boolean replyPendingRequest(long logIndex, RaftClientReply reply, RetryCache.CacheEntry cacheEntry) { - if (!pendingRequests.replyPendingRequest(logIndex, reply, cacheEntry)) { - submitUpdateCommitEvent(); - return false; - } - return true; + void replyPendingRequest(long logIndex, RaftClientReply reply) { + pendingRequests.replyPendingRequest(logIndex, reply); } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index 35e3082..a514c63 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,44 +19,18 @@ package org.apache.ratis.server.impl; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.*; -import org.apache.ratis.server.impl.RetryCache.CacheEntry; -import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; import java.util.Collection; -import java.util.Objects; import java.util.concurrent.CompletableFuture; public class PendingRequest implements Comparable<PendingRequest> { - private static class DelayedReply { - private final RaftClientReply reply; - private final CacheEntry cacheEntry; - - DelayedReply(RaftClientReply reply, CacheEntry cacheEntry) { - this.reply = reply; - this.cacheEntry = cacheEntry; - } - - RaftClientReply getReply() { - cacheEntry.updateResult(reply); - return reply; - } - - RaftClientReply fail(NotReplicatedException e) { - final RaftClientReply failed = new RaftClientReply(reply, e); - cacheEntry.updateResult(failed); - return failed; - } - } - private final long index; private final RaftClientRequest request; private final TransactionContext entry; private final CompletableFuture<RaftClientReply> future; - private volatile DelayedReply delayed; - PendingRequest(long index, RaftClientRequest request, TransactionContext entry) { this.index = index; this.request = request; @@ -97,22 +71,6 @@ public class PendingRequest implements Comparable<PendingRequest> { future.complete(r); } - synchronized void setDelayedReply(RaftClientReply r, CacheEntry c) { - Objects.requireNonNull(r); - Preconditions.assertTrue(delayed == null); - delayed = new DelayedReply(r, c); - } - - synchronized void completeDelayedReply() { - setReply(delayed.getReply()); - } - - synchronized void failDelayedReply() { - final ReplicationLevel replication = request.getType().getWrite().getReplication(); - final NotReplicatedException e = new NotReplicatedException(request.getCallId(), replication, index); - setReply(delayed.fail(e)); - } - TransactionContext setNotLeaderException(NotLeaderException nle, Collection<CommitInfoProto> commitInfos) { setReply(new RaftClientReply(getRequest(), nle, commitInfos)); return getEntry(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index f062e5b..8847a99 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +19,6 @@ package org.apache.ratis.server.impl; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.*; -import org.apache.ratis.server.impl.RetryCache.CacheEntry; -import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.Preconditions; @@ -28,10 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -76,70 +72,13 @@ class PendingRequests { } } - private static class DelayedReplies { - private final String name; - private final PriorityQueue<PendingRequest> q = new PriorityQueue<>(); - private AtomicLong allAckedIndex = new AtomicLong(); - - private DelayedReplies(Object name) { - this.name = name + "-" + getClass().getSimpleName(); - } - - boolean delay(PendingRequest request, RaftClientReply reply, CacheEntry cacheEntry) { - if (request.getIndex() <= allAckedIndex.get()) { - return false; // delay is not required. - } - - LOG.debug("{}: delay request {}", name, request); - request.setDelayedReply(reply, cacheEntry); - final boolean offered; - synchronized (q) { - offered = q.offer(request); - } - Preconditions.assertTrue(offered); - return true; - } - - void update(final long allAcked) { - final long old = allAckedIndex.getAndUpdate(n -> allAcked > n? allAcked : n); - if (allAcked <= old) { - return; - } - - LOG.debug("{}: update allAckedIndex {} -> {}", name, old, allAcked); - for(;;) { - final PendingRequest polled; - synchronized (q) { - final PendingRequest peeked = q.peek(); - if (peeked == null || peeked.getIndex() > allAcked) { - return; - } - polled = q.poll(); - Preconditions.assertTrue(polled == peeked); - } - LOG.debug("{}: complete delay request {}", name, polled); - polled.completeDelayedReply(); - } - } - - void failReplies() { - synchronized (q) { - for(; !q.isEmpty();) { - q.poll().failDelayedReply(); - } - } - } - } - private PendingRequest pendingSetConf; private final String name; private final RequestMap pendingRequests; - private final DelayedReplies delayedReplies; PendingRequests(RaftPeerId id) { this.name = id + "-" + getClass().getSimpleName(); this.pendingRequests = new RequestMap(id); - this.delayedReplies = new DelayedReplies(id); } PendingRequest add(RaftClientRequest request, TransactionContext entry) { @@ -185,21 +124,12 @@ class PendingRequests { return pendingRequest != null ? pendingRequest.getEntry() : null; } - /** @return true if the request is replied; otherwise, the reply is delayed, return false. */ - boolean replyPendingRequest(long index, RaftClientReply reply, CacheEntry cacheEntry) { + void replyPendingRequest(long index, RaftClientReply reply) { final PendingRequest pending = pendingRequests.remove(index); if (pending != null) { Preconditions.assertTrue(pending.getIndex() == index); - - final ReplicationLevel replication = pending.getRequest().getType().getWrite().getReplication(); - if (replication == ReplicationLevel.ALL) { - if (delayedReplies.delay(pending, reply, cacheEntry)) { - return false; - } - } pending.setReply(reply); } - return true; } /** @@ -213,11 +143,6 @@ class PendingRequests { if (pendingSetConf != null) { pendingSetConf.setNotLeaderException(nle, commitInfos); } - delayedReplies.failReplies(); return transactions; } - - void checkDelayedReplies(long allAckedIndex) { - delayedReplies.update(allAckedIndex); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 30ccdc7..3c42fcd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1053,18 +1053,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } // update pending request - boolean updateCache = true; // always update cache for follower synchronized (RaftServerImpl.this) { final LeaderState leaderState = role.getLeaderState().orElse(null); if (isLeader() && leaderState != null) { // is leader and is running - // For leader, update cache unless the reply is delayed. - // When a reply is delayed, the cache will be updated in DelayedReply.getReply(). - updateCache = leaderState.replyPendingRequest(logIndex, r, cacheEntry); + leaderState.replyPendingRequest(logIndex, r); } } - if (updateCache) { - cacheEntry.updateResult(r); - } + cacheEntry.updateResult(r); }); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 9d08bd0..18358a2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -425,10 +425,6 @@ public abstract class MiniRaftCluster implements Closeable { return b.toString(); } - public RaftServerImpl getLeaderAndSendFirstMessage() throws IOException { - return getLeaderAndSendFirstMessage(false); - } - public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException { final RaftServerImpl leader = getLeader(); try(RaftClient client = createClient(leader.getId())) { @@ -629,13 +625,8 @@ public abstract class MiniRaftCluster implements Closeable { public RaftClientRequest newRaftClientRequest( ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) { - return newRaftClientRequest(clientId, leaderId, callId, seqNum, message, ReplicationLevel.MAJORITY); - } - - public RaftClientRequest newRaftClientRequest( - ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message, ReplicationLevel replication) { return new RaftClientRequest(clientId, leaderId, getGroupId(), - callId, seqNum, message, RaftClientRequest.writeRequestType(replication)); + callId, seqNum, message, RaftClientRequest.writeRequestType()); } public SetConfigurationRequest newSetConfigurationRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 46630b4..e459e7c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,12 +27,10 @@ import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.JavaUtils; @@ -50,7 +48,6 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.ratis.RaftBasicTests.runTestDelayRequestIfLeaderStepDown; import static org.apache.ratis.RaftTestUtil.waitForLeader; public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest @@ -174,12 +171,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba cluster.shutdown(); } - void runTestBasicAppendEntriesAsync(ReplicationLevel replication, boolean killLeader) throws Exception { + void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception { final CLUSTER cluster = newCluster(killLeader? 5: 3); try { cluster.start(); waitForLeader(cluster); - RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 100, cluster, LOG); + RaftBasicTests.runTestBasicAppendEntries(true, killLeader, 100, cluster, LOG); } finally { cluster.shutdown(); } @@ -187,17 +184,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba @Test public void testBasicAppendEntriesAsync() throws Exception { - runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY, false); + runTestBasicAppendEntriesAsync(false); } @Test public void testBasicAppendEntriesAsyncKillLeader() throws Exception { - runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY, true); - } - - @Test - public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception { - runTestBasicAppendEntriesAsync(ReplicationLevel.ALL, false); + runTestBasicAppendEntriesAsync(true); } @Test @@ -335,11 +327,4 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba //reset for the other tests RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime); } - - @Test - public void testAsyncDelayRequestIfLeaderStepDown() throws Exception { - final CLUSTER cluster = newCluster(5); - cluster.start(); - runTestDelayRequestIfLeaderStepDown(true, cluster, LOG); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index fabd29a..f37fc21 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.impl.RaftClientTestUtil; -import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; @@ -33,7 +32,6 @@ import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; @@ -79,7 +77,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> public void testBasicAppendEntries() throws Exception { try(CLUSTER cluster = newCluster(NUM_SERVERS)) { cluster.start(); - runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, false, 10, cluster, LOG); + runTestBasicAppendEntries(false, false, 10, cluster, LOG); } } @@ -87,15 +85,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> public void testBasicAppendEntriesKillLeader() throws Exception { try(CLUSTER cluster = newCluster(NUM_SERVERS)) { cluster.start(); - runTestBasicAppendEntries(false, ReplicationLevel.MAJORITY, true, 10, cluster, LOG); - } - } - - @Test - public void testBasicAppendEntriesWithAllReplication() throws Exception { - try(CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTestBasicAppendEntries(false, ReplicationLevel.ALL, false, 10, cluster, LOG); + runTestBasicAppendEntries(false, true, 10, cluster, LOG); } } @@ -112,10 +102,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } static void runTestBasicAppendEntries( - boolean async, ReplicationLevel replication, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger LOG) + boolean async, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception { - LOG.info("runTestBasicAppendEntries: async? {}, replication={}, killLeader={}, numMessages={}", - async, replication, killLeader, numMessages); + LOG.info("runTestBasicAppendEntries: async? {}, killLeader={}, numMessages={}", + async, killLeader, numMessages); for (RaftServer s : cluster.getServers()) { cluster.restartServer(s.getId(), false); } @@ -138,7 +128,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> for (SimpleMessage message : messages) { if (async) { - client.sendAsync(message, replication).thenAcceptAsync(reply -> { + client.sendAsync(message).thenAcceptAsync(reply -> { if (!reply.isSuccess()) { f.completeExceptionally( new AssertionError("Failed with reply " + reply)); @@ -147,7 +137,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } }); } else { - final RaftClientReply reply = client.send(message, replication); + final RaftClientReply reply = client.send(message); Preconditions.assertTrue(reply.isSuccess()); } } @@ -156,16 +146,14 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> Assert.assertEquals(messages.length, asyncReplyCount.get()); } } - if (replication != ReplicationLevel.ALL) { - Thread.sleep(cluster.getMaxTimeout() + 100); - } + Thread.sleep(cluster.getTimeoutMax().toInt(TimeUnit.MILLISECONDS) + 100); LOG.info(cluster.printAllLogs()); for(RaftServerProxy server : cluster.getServers()) { final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId()); - if (impl.isAlive() || replication == ReplicationLevel.ALL) { + if (impl.isAlive()) { JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages), - 5, 1000, impl.getId() + " assertLogEntries", LOG); + 5, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG); } } } @@ -446,52 +434,4 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0); } } - - @Test - public void testDelayRequestIfLeaderStepDown() throws Exception { - try(CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTestDelayRequestIfLeaderStepDown(false, cluster, LOG); - } - } - - static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception { - boolean skipfirstserver = false; - for (RaftServer s : cluster.getServers()) { - if (!skipfirstserver) { - skipfirstserver = true; - cluster.killServer(s.getId()); - continue; - } - cluster.restartServer(s.getId(), false); - } - final RaftServerImpl leader = waitForLeader(cluster); - LOG.info("leader: " + leader.getId() + ", " + cluster.printServers()); - - final SimpleMessage message = SimpleMessage.create(1)[0]; - try (final RaftClient client = cluster.createClientWithLeader()) { - final RaftClientReply reply; - if (async) { - final CompletableFuture<RaftClientReply> f = client.sendAsync(message, ReplicationLevel.ALL); - Thread.sleep(1000); - RaftTestUtil.changeLeader(cluster, leader.getId()); - - reply = f.get(); - } else { - new Thread(() -> { - try { - Thread.sleep(1000); - RaftTestUtil.changeLeader(cluster, leader.getId()); - } catch (Exception e) { - LOG.warn("changeLeader", e); - } - }).start(); - - reply = client.send(message, ReplicationLevel.ALL); - } - throw reply.getNotReplicatedException(); - } catch (NotReplicatedException e) { - LOG.info("Expected", e); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index a77e6e4..2aa1d85 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,11 +17,11 @@ */ package org.apache.ratis; +import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster.PeerChanges; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -32,41 +32,24 @@ import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftLogIOException; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Arrays; import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; -public abstract class RetryCacheTests extends BaseTest { - public static final int NUM_SERVERS = 3; - protected static final RaftProperties properties = new RaftProperties(); - - public abstract MiniRaftCluster getCluster(); - - public RaftProperties getProperties() { - return properties; +public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); } - @Before - public void setup() throws IOException { - Assert.assertNull(getCluster().getLeader()); - getCluster().start(); - } - - @After - public void tearDown() { - final MiniRaftCluster cluster = getCluster(); - if (cluster != null) { - cluster.shutdown(); - } - } + public static final int NUM_SERVERS = 3; /** * make sure the retry cache can correct capture the retry from a client, @@ -74,10 +57,12 @@ public abstract class RetryCacheTests extends BaseTest { */ @Test public void testBasicRetry() throws Exception { - final MiniRaftCluster cluster = getCluster(); - RaftTestUtil.waitForLeader(cluster); + runWithNewCluster(NUM_SERVERS, this::runTestBasicRetry); + } - final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); + void runTestBasicRetry(CLUSTER cluster) throws Exception { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId(); long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); final RaftClient client = cluster.createClient(leaderId); @@ -135,10 +120,12 @@ public abstract class RetryCacheTests extends BaseTest { */ @Test public void testRetryOnNewLeader() throws Exception { - final MiniRaftCluster cluster = getCluster(); - RaftTestUtil.waitForLeader(cluster); + runWithNewCluster(NUM_SERVERS, this::runTestRetryOnNewLeader); + } - final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); + void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId(); final RaftClient client = cluster.createClient(leaderId); RaftClientRpc rpc = client.getClientRpc(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java index cd04b43..579cd24 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,99 +17,9 @@ */ package org.apache.ratis.grpc; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.log4j.Level; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RetryCacheTests; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.proto.RaftProtos; -import org.apache.ratis.util.LogUtils; -import org.junit.Assert; -import org.junit.Test; - -public class TestRetryCacheWithGrpc extends RetryCacheTests { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithGrpc cluster; - - public TestRetryCacheWithGrpc() throws IOException { - cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster( - NUM_SERVERS, properties); - Assert.assertNull(cluster.getLeader()); - } - - @Override - public MiniRaftClusterWithGrpc getCluster() { - return cluster; - } - - @Test - public void testAsyncRetryWithReplicatedAll() throws Exception { - final MiniRaftCluster cluster = getCluster(); - RaftTestUtil.waitForLeader(cluster); - - final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage().getId(); - long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); - - // Kill a follower - final RaftPeerId killedFollower = cluster.getFollowers().get(0).getId(); - cluster.killServer(killedFollower); - - final long callId = 999; - final long seqNum = 111; - final ClientId clientId = ClientId.randomId(); - - // Retry with the same clientId and callId - final List<CompletableFuture<RaftClient>> futures = new ArrayList<>(); - futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster)); - futures.addAll(sendRetry(clientId, leaderId, callId, seqNum, cluster)); - - // restart the killed follower - cluster.restartServer(killedFollower, false); - for(CompletableFuture<RaftClient> f : futures) { - f.join().close(); - } - assertServer(cluster, clientId, callId, oldLastApplied); - } - - List<CompletableFuture<RaftClient>> sendRetry( - ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, MiniRaftCluster cluster) - throws Exception { - List<CompletableFuture<RaftClient>> futures = new ArrayList<>(); - final int numRequest = 3; - for (int i = 0; i < numRequest; i++) { - final RaftClient client = cluster.createClient(leaderId, cluster.getGroup(), clientId); - final RaftClientRpc rpc = client.getClientRpc(); - final RaftClientRequest request = cluster.newRaftClientRequest(client.getId(), leaderId, - callId, seqNum, new RaftTestUtil.SimpleMessage("message"), RaftProtos.ReplicationLevel.ALL); - - LOG.info("{} sendRequestAsync {}", i, request); - futures.add(rpc.sendRequestAsync(request) - .thenApply(reply -> assertReply(reply, client, callId))); - } - for(CompletableFuture<RaftClient> f : futures) { - try { - f.get(200, TimeUnit.MILLISECONDS); - Assert.fail("It should timeout for ReplicationLevel.ALL since a follower is down"); - } catch(TimeoutException te) { - LOG.info("Expected " + te); - } - } - return futures; - } +public class TestRetryCacheWithGrpc + extends RetryCacheTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-test/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java index 659e426..3b5de5a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,29 +17,9 @@ */ package org.apache.ratis.netty; -import java.io.IOException; - -import org.apache.log4j.Level; import org.apache.ratis.RetryCacheTests; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.LogUtils; - -public class TestRetryCacheWithNettyRpc extends RetryCacheTests { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithNetty cluster; - - public TestRetryCacheWithNettyRpc() throws IOException { - cluster = MiniRaftClusterWithNetty.FACTORY.newCluster( - NUM_SERVERS, getProperties()); - } - @Override - public MiniRaftClusterWithNetty getCluster() { - return cluster; - } +public class TestRetryCacheWithNettyRpc + extends RetryCacheTests<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/959d493c/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java index a088578..5f01825 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,29 +17,9 @@ */ package org.apache.ratis.server.simulation; -import java.io.IOException; - -import org.apache.log4j.Level; import org.apache.ratis.RetryCacheTests; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.util.LogUtils; - -public class TestRetryCacheWithSimulatedRpc extends RetryCacheTests { - static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithSimulatedRpc cluster; - - public TestRetryCacheWithSimulatedRpc() throws IOException { - cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster( - NUM_SERVERS, getProperties()); - } - @Override - public MiniRaftClusterWithSimulatedRpc getCluster() { - return cluster; - } +public class TestRetryCacheWithSimulatedRpc + extends RetryCacheTests<MiniRaftClusterWithSimulatedRpc> + implements MiniRaftClusterWithSimulatedRpc.FactoryGet { }
