Repository: incubator-ratis Updated Branches: refs/heads/master 771e8adc4 -> 42fff2b26
RATIS-17. Add basic retry cache implementation for Raft Server. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/42fff2b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/42fff2b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/42fff2b2 Branch: refs/heads/master Commit: 42fff2b266a7841e075cc2202fe4530a4c4cab61 Parents: 771e8ad Author: Jing Zhao <[email protected]> Authored: Fri Mar 31 00:17:37 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Fri Mar 31 00:17:37 2017 -0700 ---------------------------------------------------------------------- .../TestRaftStateMachineException.java | 116 +++++++++- .../ratis/grpc/TestRetryCacheWithGrpc.java | 46 ++++ .../hadooprpc/TestRetryCacheWithHadoopRpc.java | 46 ++++ .../ratis/netty/TestRetryCacheWithNettyRpc.java | 45 ++++ .../ratis/server/RaftServerConfigKeys.java | 21 ++ .../apache/ratis/server/impl/LeaderState.java | 6 +- .../ratis/server/impl/PendingRequest.java | 3 + .../ratis/server/impl/PendingRequests.java | 16 +- .../ratis/server/impl/RaftServerImpl.java | 132 ++++++++--- .../apache/ratis/server/impl/RetryCache.java | 225 +++++++++++++++++++ .../ratis/server/impl/StateMachineUpdater.java | 2 +- .../org/apache/ratis/RaftRetryCacheTests.java | 174 ++++++++++++++ .../ratis/server/impl/RaftServerTestUtil.java | 14 ++ .../TestRetryCacheWithSimulatedRpc.java | 45 ++++ 14 files changed, 842 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index 447f2ea..d8a32f6 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -21,11 +21,16 @@ import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.examples.RaftExamplesTestUtil; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.impl.RetryCache; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.util.RaftUtils; @@ -33,6 +38,8 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -42,6 +49,8 @@ import static org.junit.Assert.fail; @RunWith(Parameterized.class) public class TestRaftStateMachineException { + public static final Logger LOG = LoggerFactory.getLogger(TestRaftStateMachineException.class); + static { RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); @@ -49,6 +58,8 @@ public class TestRaftStateMachineException { RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } + protected static boolean failPreAppend = false; + protected static class StateMachineWithException extends SimpleStateMachine4Testing { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { @@ -56,6 +67,16 @@ public class TestRaftStateMachineException { future.completeExceptionally(new StateMachineException("Fake Exception")); return future; } + + @Override + public TransactionContext preAppendTransaction(TransactionContext trx) + throws IOException { + if (failPreAppend) { + throw new IOException("Fake Exception"); + } else { + return trx; + } + } } @Parameterized.Parameters @@ -69,7 +90,7 @@ public class TestRaftStateMachineException { @Test public void testHandleStateMachineException() throws Exception { - cluster.start(); + cluster.restart(true); RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); @@ -84,4 +105,97 @@ public class TestRaftStateMachineException { cluster.shutdown(); } + + @Test + public void testRetryOnStateMachineException() throws Exception { + cluster.restart(true); + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + RaftClient client = cluster.createClient(leaderId); + try { + client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready")); + } catch (Exception ignored) { + } + long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); + + final RaftClientRpc rpc = client.getClientRpc(); + final long callId = 999; + RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, + callId, new RaftTestUtil.SimpleMessage("message")); + RaftClientReply reply = rpc.sendRequest(r); + Assert.assertFalse(reply.isSuccess()); + Assert.assertNotNull(reply.getStateMachineException()); + + // retry with the same callId + for (int i = 0; i < 5; i++) { + reply = rpc.sendRequest(r); + Assert.assertEquals(client.getId(), reply.getClientId()); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertFalse(reply.isSuccess()); + Assert.assertNotNull(reply.getStateMachineException()); + } + + long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex(); + // make sure retry cache has the entry + for (RaftServerImpl server : cluster.getServers()) { + LOG.info("check server " + server.getId()); + if (server.getState().getLastAppliedIndex() < leaderApplied) { + Thread.sleep(1000); + } + Assert.assertNotNull( + RaftServerTestUtil.getRetryEntry(server, client.getId(), callId)); + Assert.assertEquals(oldLastApplied + 1, + server.getState().getLastAppliedIndex()); + } + + cluster.shutdown(); + } + + @Test + public void testRetryOnExceptionDuringReplication() throws Exception { + cluster.restart(true); + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + RaftClient client = cluster.createClient(leaderId); + try { + client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready")); + } catch (Exception ignored) { + } + + // turn on the preAppend failure switch + failPreAppend = true; + final RaftClientRpc rpc = client.getClientRpc(); + final long callId = 999; + RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, + callId, new RaftTestUtil.SimpleMessage("message")); + try { + rpc.sendRequest(r); + Assert.fail("Exception expected"); + } catch (Exception e) { + e.printStackTrace(); + } + + RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry( + cluster.getLeader(), client.getId(), callId); + Assert.assertNotNull(oldEntry); + Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry)); + + // retry + try { + rpc.sendRequest(r); + Assert.fail("Exception expected"); + } catch (Exception e) { + e.printStackTrace(); + } + RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry( + cluster.getLeader(), client.getId(), callId); + Assert.assertNotNull(currentEntry); + Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry)); + Assert.assertNotEquals(oldEntry, currentEntry); + + failPreAppend = false; + cluster.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java new file mode 100644 index 0000000..d709c1c --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftRetryCacheTests; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; + +import java.io.IOException; + +public class TestRetryCacheWithGrpc extends RaftRetryCacheTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithGRpc cluster; + + public TestRetryCacheWithGrpc() throws IOException { + cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster( + NUM_SERVERS, properties); + Assert.assertNull(cluster.getLeader()); + } + + @Override + public MiniRaftClusterWithGRpc getCluster() { + return cluster; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java new file mode 100644 index 0000000..fade34b --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRetryCacheWithHadoopRpc.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftRetryCacheTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; + +import java.io.IOException; + +public class TestRetryCacheWithHadoopRpc extends RaftRetryCacheTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithHadoopRpc cluster; + + public TestRetryCacheWithHadoopRpc() throws IOException { + cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster( + NUM_SERVERS, getProperties()); + } + + @Override + public MiniRaftClusterWithHadoopRpc getCluster() { + return cluster; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java new file mode 100644 index 0000000..540cc16 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRetryCacheWithNettyRpc.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftRetryCacheTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; + +import java.io.IOException; + +public class TestRetryCacheWithNettyRpc extends RaftRetryCacheTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithNetty cluster; + + public TestRetryCacheWithNettyRpc() throws IOException { + cluster = MiniRaftClusterWithNetty.FACTORY.newCluster( + NUM_SERVERS, getProperties()); + } + + @Override + public MiniRaftClusterWithNetty getCluster() { + return cluster; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 66e06e0..be0c6bc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.server; +import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static org.apache.ratis.conf.ConfUtils.*; @@ -185,6 +187,25 @@ public interface RaftServerConfigKeys { } } + /** server retry cache related */ + interface RetryCache { + String PREFIX = RaftServerConfigKeys.PREFIX + ".retrycache"; + + String CAPACITY_KEY = PREFIX + ".capacity"; + int CAPACITY_DEFAULT = 4096; + static int capacity(RaftProperties properties) { + return ConfUtils.getInt(properties::getInt, CAPACITY_KEY, CAPACITY_DEFAULT, + ConfUtils.requireMin(0)); + } + + String EXPIRYTIME_KEY = PREFIX + ".expirytime"; + TimeDuration EXPIRYTIME_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS); + static TimeDuration expiryTime(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(EXPIRYTIME_DEFAULT.getUnit()), + EXPIRYTIME_KEY, EXPIRYTIME_DEFAULT); + } + } + static void main(String[] args) { printAll(RaftServerConfigKeys.class); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index f6c9ade..e7704fc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -516,9 +515,8 @@ public class LeaderState { return pending; } - void replyPendingRequest(long logIndex, - CompletableFuture<Message> stateMachineFuture) { - pendingRequests.replyPendingRequest(logIndex, stateMachineFuture); + void replyPendingRequest(long logIndex, RaftClientReply reply) { + pendingRequests.replyPendingRequest(logIndex, reply); } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index f5581b9..1d51e54 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -60,6 +60,9 @@ public class PendingRequest implements Comparable<PendingRequest> { return entry; } + /** + * This is only used when setting new raft configuration. + */ synchronized void setException(Throwable e) { RaftUtils.assertTrue(e != null); future.completeExceptionally(e); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index d4b74f2..c56d7a8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -88,23 +88,11 @@ class PendingRequests { return pendingRequest != null ? pendingRequest.getEntry() : null; } - void replyPendingRequest(long index, - CompletableFuture<Message> stateMachineFuture) { + void replyPendingRequest(long index, RaftClientReply reply) { final PendingRequest pending = pendingRequests.get(index); if (pending != null) { RaftUtils.assertTrue(pending.getIndex() == index); - - stateMachineFuture.whenComplete((reply, exception) -> { - if (exception == null) { - pending.setSuccessReply(reply); - } else { - // the exception is coming from the state machine. wrap it into the - // reply as a StateMachineException - final StateMachineException e = new StateMachineException( - server.getId().toString(), exception); - pending.setReply(new RaftClientReply(pending.getRequest(), e)); - } - }); + pending.setReply(reply); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 082c1bf..ca1b6d3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -27,6 +27,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; @@ -35,6 +36,7 @@ import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.RaftUtils; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,8 @@ public class RaftServerImpl implements RaftServer { private final ServerFactory factory; + private final RetryCache retryCache; + RaftServerImpl(RaftPeerId id, StateMachine stateMachine, RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) throws IOException { @@ -103,6 +107,13 @@ public class RaftServerImpl implements RaftServer { final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); this.serverRpc = initRaftServerRpc(); + retryCache = initRetryCache(properties); + } + + private RetryCache initRetryCache(RaftProperties prop) { + final int capacity = RaftServerConfigKeys.RetryCache.capacity(prop); + final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop); + return new RetryCache(capacity, expireTime); } @Override @@ -133,6 +144,11 @@ public class RaftServerImpl implements RaftServer { return this.stateMachine; } + @VisibleForTesting + RetryCache getRetryCache() { + return retryCache; + } + private RaftServerRpc initRaftServerRpc() { final RaftServerRpc rpc = getFactory().newRaftServerRpc(this); // add peers into rpc service @@ -322,17 +338,26 @@ public class RaftServerImpl implements RaftServer { * @return null if the server is in leader state. */ private CompletableFuture<RaftClientReply> checkLeaderState( - RaftClientRequest request) { + RaftClientRequest request, RetryCache.CacheEntry entry) { if (!isLeader()) { NotLeaderException exception = generateNotLeaderException(); - CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.complete(new RaftClientReply(request, exception)); - return future; + final RaftClientReply reply = new RaftClientReply(request, exception); + if (entry != null) { + entry.failWithReply(reply); + } + return entry != null ? + entry.getReplyFuture() : CompletableFuture.completedFuture(reply); } else { if (leaderState == null || !leaderState.isReady()) { - CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.completeExceptionally(new LeaderNotReadyException()); - return future; + final Exception e = new LeaderNotReadyException(); + if (entry != null) { + entry.failWithException(e); + return entry.getReplyFuture(); + } else { + CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } } } return null; @@ -360,15 +385,15 @@ public class RaftServerImpl implements RaftServer { * Handle a normal update request from client. */ private CompletableFuture<RaftClientReply> appendTransaction( - RaftClientRequest request, TransactionContext entry) - throws RaftException { + RaftClientRequest request, TransactionContext context, + RetryCache.CacheEntry retryEntry) throws RaftException { LOG.debug("{}: receive client request({})", getId(), request); lifeCycle.assertCurrentState(RUNNING); CompletableFuture<RaftClientReply> reply; final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request); + reply = checkLeaderState(request, retryEntry); if (reply != null) { return reply; } @@ -376,14 +401,19 @@ public class RaftServerImpl implements RaftServer { // append the message to its local log final long entryIndex; try { - entryIndex = state.applyLog(entry, request.getClientId(), + entryIndex = state.applyLog(context, request.getClientId(), request.getCallId()); } catch (IOException e) { - throw new RaftException(e); + // TODO looks like the IOException is actually only thrown by the SM in + // the preAppend stage. In that case we should wrap the exception in + // StateMachineException and return the exception in a RaftClientReply. + RaftException re = new RaftException(e); + retryEntry.failWithException(re); + throw re; } // put the request into the pending queue - pending = leaderState.addPendingRequest(entryIndex, request, entry); + pending = leaderState.addPendingRequest(entryIndex, request, context); leaderState.notifySenders(); } return pending.getFuture(); @@ -393,27 +423,41 @@ public class RaftServerImpl implements RaftServer { public CompletableFuture<RaftClientReply> submitClientRequestAsync( RaftClientRequest request) throws IOException { // first check the server's leader state - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null); if (reply != null) { return reply; } // let the state machine handle read-only request from client if (request.isReadOnly()) { - // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, - // section 8 (last part) + // TODO: We might not be the leader anymore by the time this completes. + // See the RAFT paper section 8 (last part) return stateMachine.query(request); } - // TODO: this client request will not be added to pending requests - // until later which means that any failure in between will leave partial state in the - // state machine. We should call cancelTransaction() for failed requests - TransactionContext entry = stateMachine.startTransaction(request); - if (entry.getException().isPresent()) { - throw RaftUtils.asIOException(entry.getException().get()); + // query the retry cache + RetryCache.CacheQueryResult previousResult = retryCache.queryCache( + request.getClientId(), request.getCallId()); + if (previousResult.isRetry()) { + // if the previous attempt is still pending or it succeeded, return its + // future + return previousResult.getEntry().getReplyFuture(); + } + final RetryCache.CacheEntry cacheEntry = previousResult.getEntry(); + + // TODO: this client request will not be added to pending requests until + // later which means that any failure in between will leave partial state in + // the state machine. We should call cancelTransaction() for failed requests + TransactionContext context = stateMachine.startTransaction(request); + if (context.getException().isPresent()) { + RaftClientReply exceptionReply = new RaftClientReply(request, + new StateMachineException(getId().toString(), + context.getException().get())); + cacheEntry.failWithReply(exceptionReply); + return CompletableFuture.completedFuture(exceptionReply); } - return appendTransaction(request, entry); + return appendTransaction(request, context, cacheEntry); } @Override @@ -459,7 +503,7 @@ public class RaftServerImpl implements RaftServer { SetConfigurationRequest request) throws IOException { LOG.debug("{}: receive setConfiguration({})", getId(), request); lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null); if (reply != null) { return reply; } @@ -467,7 +511,7 @@ public class RaftServerImpl implements RaftServer { final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request); + reply = checkLeaderState(request, null); if (reply != null) { return reply; } @@ -799,11 +843,41 @@ public class RaftServerImpl implements RaftServer { } } - synchronized void replyPendingRequest(long logIndex, + /** + * The log has been submitted to the state machine. Use the future to update + * the pending requests and retry cache. + * @param logEntry the log entry that has been submitted to the state machine + * @param stateMachineFuture the future returned by the state machine + * from which we will get transaction result later + */ + void replyPendingRequest(LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) { - if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logIndex, stateMachineFuture); - } + // update the retry cache + final ClientId clientId = new ClientId(logEntry.getClientId().toByteArray()); + final long callId = logEntry.getCallId(); + final RaftPeerId serverId = getId(); + final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry( + clientId, logEntry.getCallId()); + stateMachineFuture.whenComplete((reply, exception) -> { + final RaftClientReply r; + if (exception == null) { + r = new RaftClientReply(clientId, serverId, callId, true, reply, null); + } else { + // the exception is coming from the state machine. wrap it into the + // reply as a StateMachineException + final StateMachineException e = new StateMachineException( + getId().toString(), exception); + r = new RaftClientReply(clientId, serverId, callId, false, null, e); + } + // update retry cache + cacheEntry.updateResult(r); + // update pending request + synchronized (RaftServerImpl.this) { + if (isLeader() && leaderState != null) { // is leader and is running + leaderState.replyPendingRequest(logEntry.getIndex(), r); + } + } + }); } TransactionContext getTransactionContext(long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java new file mode 100644 index 0000000..0372558 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.shaded.com.google.common.cache.Cache; +import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder; +import org.apache.ratis.util.RaftUtils; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class RetryCache implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(RetryCache.class); + private static final int MIN_CAPACITY = 128; + + static class CacheKey { + private final ClientId clientId; + private final long callId; + + CacheKey(ClientId clientId, long callId) { + this.clientId = clientId; + this.callId = callId; + } + + @Override + public int hashCode() { + return clientId.hashCode() ^ Long.hashCode(callId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof CacheKey) { + CacheKey e = (CacheKey) obj; + return e.clientId.equals(clientId) && callId == e.callId; + } + return false; + } + + @Override + public String toString() { + return clientId.toString() + ":" + this.callId; + } + } + + /** + * CacheEntry is tracked using unique client ID and callId of the RPC request + */ + @VisibleForTesting + public static class CacheEntry { + private final CacheKey key; + private final CompletableFuture<RaftClientReply> replyFuture = + new CompletableFuture<>(); + + /** + * "failed" means we failed to commit the request into the raft group, or + * the request did not get approved by the state machine before the raft + * replication. Not once the request gets committed by the raft group, this + * field is never true even if the state machine throws an exception when + * applying the transaction. + */ + private volatile boolean failed = false; + + CacheEntry(CacheKey key) { + this.key = key; + } + + @Override + public String toString() { + return key + ":" + (isDone() ? "done" : "pending"); + } + + boolean isDone() { + return isFailed() || replyFuture.isDone(); + } + + void updateResult(RaftClientReply reply) { + assert !replyFuture.isDone() && !replyFuture.isCancelled(); + replyFuture.complete(reply); + } + + boolean isFailed() { + return failed || replyFuture.isCompletedExceptionally(); + } + + void failWithReply(RaftClientReply reply) { + failed = true; + replyFuture.complete(reply); + } + + void failWithException(Throwable t) { + failed = true; + replyFuture.completeExceptionally(t); + } + + CompletableFuture<RaftClientReply> getReplyFuture() { + return replyFuture; + } + } + + static class CacheQueryResult { + private final CacheEntry entry; + private final boolean isRetry; + + CacheQueryResult(CacheEntry entry, boolean isRetry) { + this.entry = entry; + this.isRetry = isRetry; + } + + public CacheEntry getEntry() { + return entry; + } + + public boolean isRetry() { + return isRetry; + } + } + + private final Cache<CacheKey, CacheEntry> cache; + + /** + * @param capacity the capacity of the cache + * @param expirationTime time for an entry to expire in milliseconds + */ + RetryCache(int capacity, TimeDuration expirationTime) { + capacity = Math.max(capacity, MIN_CAPACITY); + cache = CacheBuilder.newBuilder().maximumSize(capacity) + .expireAfterWrite(expirationTime.toLong(TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS).build(); + } + + CacheEntry getOrCreateEntry(ClientId clientId, long callId) { + final CacheKey key = new CacheKey(clientId, callId); + final CacheEntry entry; + try { + entry = cache.get(key, () -> new CacheEntry(key)); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + RaftUtils.assertTrue(entry != null && !entry.isDone(), + "retry cache entry should be pending: %s", entry); + return entry; + } + + private CacheEntry refreshEntry(CacheEntry newEntry) { + cache.put(newEntry.key, newEntry); + return newEntry; + } + + CacheQueryResult queryCache(ClientId clientId, long callId) { + CacheKey key = new CacheKey(clientId, callId); + final CacheEntry newEntry = new CacheEntry(key); + CacheEntry cacheEntry; + try { + cacheEntry = cache.get(key, () -> newEntry); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + + if (cacheEntry == newEntry) { + // this is the entry we just newly created + return new CacheQueryResult(cacheEntry, false); + } else if (!cacheEntry.isDone() || !cacheEntry.isFailed()){ + // the previous attempt is either pending or successful + return new CacheQueryResult(cacheEntry, true); + } + + // the previous attempt failed, replace it with a new one. + synchronized (this) { + // need to recheck, since there may be other retry attempts being + // processed at the same time. The recheck+replacement should be protected + // by lock. + CacheEntry currentEntry = cache.getIfPresent(key); + if (currentEntry == cacheEntry || currentEntry == null) { + // if the failed entry has not got replaced by another retry, or the + // failed entry got invalidated, we add a new cache entry + return new CacheQueryResult(refreshEntry(newEntry), false); + } else { + return new CacheQueryResult(currentEntry, true); + } + } + } + + @VisibleForTesting + long size() { + return cache.size(); + } + + @VisibleForTesting + CacheEntry get(ClientId clientId, long callId) { + return cache.getIfPresent(new CacheKey(clientId, callId)); + } + + @Override + public synchronized void close() { + if (cache != null) { + cache.invalidateAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index b4fc705..38436a2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -164,7 +164,7 @@ class StateMachineUpdater implements Runnable { // TODO: This step can be parallelized CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx); - server.replyPendingRequest(next.getIndex(), stateMachineFuture); + server.replyPendingRequest(next, stateMachineFuture); } lastAppliedIndex++; } else { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java new file mode 100644 index 0000000..66c54cb --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/RaftRetryCacheTests.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.MiniRaftCluster.PeerChanges; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static java.util.Arrays.asList; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; + +public abstract class RaftRetryCacheTests { + public static final Logger LOG = LoggerFactory.getLogger(RaftRetryCacheTests.class); + + public static final int NUM_SERVERS = 3; + protected static final RaftProperties properties = new RaftProperties(); + + public abstract MiniRaftCluster getCluster(); + + public RaftProperties getProperties() { + return properties; + } + + @Rule + public Timeout globalTimeout = new Timeout(120 * 1000); + + @Before + public void setup() throws IOException { + Assert.assertNull(getCluster().getLeader()); + getCluster().start(); + } + + @After + public void tearDown() { + final MiniRaftCluster cluster = getCluster(); + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * make sure the retry cache can correct capture the retry from a client, + * and returns the result from the previous request + */ + @Test + public void testBasicRetry() throws Exception { + final MiniRaftCluster cluster = getCluster(); + RaftTestUtil.waitForLeader(cluster); + + final RaftPeerId leaderId = cluster.getLeader().getId(); + RaftClient client = cluster.createClient(leaderId); + client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready")); + long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); + + final RaftClientRpc rpc = client.getClientRpc(); + final long callId = 999; + RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, + callId, new RaftTestUtil.SimpleMessage("message")); + RaftClientReply reply = rpc.sendRequest(r); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertTrue(reply.isSuccess()); + + // retry with the same callId + for (int i = 0; i < 5; i++) { + reply = rpc.sendRequest(r); + Assert.assertEquals(client.getId(), reply.getClientId()); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertTrue(reply.isSuccess()); + } + + long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex(); + // make sure retry cache has the entry + for (RaftServerImpl server : cluster.getServers()) { + LOG.info("check server " + server.getId()); + if (server.getState().getLastAppliedIndex() < leaderApplied) { + Thread.sleep(1000); + } + Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server)); + Assert.assertNotNull( + RaftServerTestUtil.getRetryEntry(server, client.getId(), callId)); + // make sure there is only one log entry committed + Assert.assertEquals(oldLastApplied + 1, + server.getState().getLastAppliedIndex()); + } + } + + /** + * Test retry while the leader changes to another peer + */ + @Test + public void testRetryOnNewLeader() throws Exception { + final MiniRaftCluster cluster = getCluster(); + RaftTestUtil.waitForLeader(cluster); + + final RaftPeerId leaderId = cluster.getLeader().getId(); + RaftClient client = cluster.createClient(leaderId); + client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready")); + + RaftClientRpc rpc = client.getClientRpc(); + final long callId = 999; + RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, + callId, new RaftTestUtil.SimpleMessage("message")); + RaftClientReply reply = rpc.sendRequest(r); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertTrue(reply.isSuccess()); + long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex(); + + // trigger the reconfiguration, make sure the original leader is kicked out + PeerChanges change = cluster.addNewPeers(2, true); + RaftPeer[] allPeers = cluster.removePeers(2, true, + asList(change.newPeers)).allPeersInNewConf; + // trigger setConfiguration + SetConfigurationRequest request = new SetConfigurationRequest( + client.getId(), cluster.getLeader().getId(), DEFAULT_CALLID, allPeers); + LOG.info("Start changing the configuration: {}", request); + cluster.getLeader().setConfiguration(request); + + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId newLeaderId = cluster.getLeader().getId(); + Assert.assertNotEquals(leaderId, newLeaderId); + // same clientId and callId in the request + r = new RaftClientRequest(client.getId(), newLeaderId, + callId, new RaftTestUtil.SimpleMessage("message")); + for (int i = 0; i < 10; i++) { + try { + reply = rpc.sendRequest(r); + LOG.info("successfully sent out the retry request_" + i); + Assert.assertEquals(client.getId(), reply.getClientId()); + Assert.assertEquals(callId, reply.getCallId()); + Assert.assertTrue(reply.isSuccess()); + } catch (Exception e) { + LOG.info("hit exception while retrying the same request: " + e); + } + Thread.sleep(100); + } + + // check the new leader and make sure the retry did not get committed + Assert.assertEquals(oldLastApplied + 3, + cluster.getLeader().getState().getLastAppliedIndex()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 012cfd7..0762f21 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -19,6 +19,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeer; import org.junit.Assert; import org.slf4j.Logger; @@ -69,4 +70,17 @@ public class RaftServerTestUtil { RaftConfiguration initialConf) { return new ConfigurationManager(initialConf); } + + public static long getRetryCacheSize(RaftServerImpl server) { + return server.getRetryCache().size(); + } + + public static RetryCache.CacheEntry getRetryEntry(RaftServerImpl server, + ClientId clientId, long callId) { + return server.getRetryCache().get(clientId, callId); + } + + public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) { + return entry.isFailed(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42fff2b2/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java new file mode 100644 index 0000000..5e896e2 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftRetryCacheTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; + +import java.io.IOException; + +public class TestRetryCacheWithSimulatedRpc extends RaftRetryCacheTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithSimulatedRpc cluster; + + public TestRetryCacheWithSimulatedRpc() throws IOException { + cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster( + NUM_SERVERS, getProperties()); + } + + @Override + public MiniRaftClusterWithSimulatedRpc getCluster() { + return cluster; + } +}
