Repository: incubator-ratis Updated Branches: refs/heads/master 298c1a2de -> a783abd0d
RATIS-114. TestRaftWithHadoopRpc.testWithLoad may timeout. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a783abd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a783abd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a783abd0 Branch: refs/heads/master Commit: a783abd0d253d2082c5db6ed8ef485486ebb3c7a Parents: 298c1a2 Author: Jing Zhao <[email protected]> Authored: Fri Sep 15 11:58:21 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Fri Sep 15 11:58:21 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClientRpc.java | 7 ++ .../ratis/client/impl/RaftClientImpl.java | 6 +- .../client/impl/RaftClientRpcWithProxy.java | 68 ++++++++++ .../ratis/protocol/RaftClientMessage.java | 2 +- .../ratis/protocol/RaftClientRequest.java | 2 +- .../java/org/apache/ratis/util/JavaUtils.java | 9 ++ .../org/apache/ratis/util/PeerProxyMap.java | 13 +- .../org/apache/ratis/TestMultiRaftGroup.java | 2 +- .../ratis/grpc/client/AppendStreamer.java | 1 + .../apache/ratis/grpc/client/GrpcClientRpc.java | 22 +--- .../ratis/hadooprpc/HadoopConfigKeys.java | 4 + .../ratis/hadooprpc/client/HadoopClientRpc.java | 23 +--- .../hadooprpc/server/HadoopRpcService.java | 3 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 5 + .../ratis/hadooprpc/TestRaftWithHadoopRpc.java | 12 +- .../org/apache/ratis/netty/NettyFactory.java | 1 - .../ratis/netty/client/NettyClientRpc.java | 20 +-- .../ratis/netty/server/NettyRpcService.java | 2 +- .../apache/ratis/server/impl/LeaderState.java | 1 + .../ratis/server/impl/RaftServerImpl.java | 4 +- .../ratis/server/impl/RaftServerProxy.java | 1 + .../java/org/apache/ratis/MiniRaftCluster.java | 15 ++- .../java/org/apache/ratis/RaftBasicTests.java | 125 ++++++++++++++----- .../java/org/apache/ratis/RaftTestUtil.java | 8 -- .../impl/BlockRequestHandlingInjection.java | 29 +++-- .../impl/DelayLocalExecutionInjection.java | 5 + .../server/simulation/SimulatedClientRpc.java | 11 ++ 27 files changed, 286 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java index ca1864b..afe058c 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -23,12 +23,19 @@ import java.io.IOException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; /** The client side rpc of a raft service. */ public interface RaftClientRpc extends Closeable { + /** Set the client name */ + void setName(String name); + /** Send a request. */ RaftClientReply sendRequest(RaftClientRequest request) throws IOException; /** Add the information of the given raft servers */ void addServers(Iterable<RaftPeer> servers); + + /** Handle the given exception. For example, try reconnecting. */ + void handleException(RaftPeerId serverId, Exception e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 30baee6..8a0ddef 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -147,7 +147,7 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException, GroupMismatchException { - LOG.debug("{}: {}", clientId, request); + LOG.debug("{}: send {}", clientId, request); RaftClientReply reply = null; try { reply = clientRpc.sendRequest(request); @@ -157,7 +157,7 @@ final class RaftClientImpl implements RaftClient { handleIOException(request, ioe, null); } if (reply != null) { - LOG.debug("{}: {}", clientId, reply); + LOG.debug("{}: receive {}", clientId, reply); if (reply.isNotLeader()) { handleNotLeaderException(request, reply.getNotLeaderException()); return null; @@ -196,6 +196,8 @@ final class RaftClientImpl implements RaftClient { } final RaftPeerId oldLeader = request.getServerId(); + clientRpc.handleException(oldLeader, ioe); + if (newLeader == null && oldLeader.equals(leaderId)) { newLeader = CollectionUtils.random(oldLeader, CollectionUtils.as(peers, RaftPeer::getId)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java new file mode 100644 index 0000000..4294249 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client.impl; + +import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.ReflectionUtils; + +import java.io.Closeable; +import java.io.EOFException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; + +/** An abstract {@link RaftClientRpc} implementation using {@link PeerProxyMap}. */ +public abstract class RaftClientRpcWithProxy<PROXY extends Closeable> + implements RaftClientRpc { + private final PeerProxyMap<PROXY> proxies; + + protected RaftClientRpcWithProxy(PeerProxyMap<PROXY> proxies) { + this.proxies = proxies; + } + + public PeerProxyMap<PROXY> getProxies() { + return proxies; + } + + @Override + public void setName(String name) { + proxies.setName(name); + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void handleException(RaftPeerId serverId, Exception e) { + if (ReflectionUtils.isInstance(e, + SocketException.class, SocketTimeoutException.class, + ClosedChannelException.class, EOFException.class)) { + proxies.resetProxy(serverId); + } + } + + @Override + public void close() { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java index 49f2b6f..07354d4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java @@ -55,6 +55,6 @@ public abstract class RaftClientMessage implements RaftRpcMessage { @Override public String toString() { return getClass().getSimpleName() + "(" + clientId + "->" + serverId - + ") in raft group " + groupId; + + ") in " + groupId; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 41bdb2e..63e482d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -58,6 +58,6 @@ public class RaftClientRequest extends RaftClientMessage { @Override public String toString() { return super.toString() + ", callId: " + callId + ", " - + (isReadOnly()? "RO": "RW"); + + (isReadOnly()? "RO": "RW") + ", " + getMessage(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 9d4a6e5..ed115cb 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -23,6 +23,9 @@ package org.apache.ratis.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -168,4 +171,10 @@ public interface JavaUtils { } } + static void dumpAllThreads(Consumer<String> println) { + final ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); + for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) { + println.accept(ti.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java index 3319a6a..856e98e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java @@ -66,14 +66,15 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { try { proxy.close(); } catch (IOException e) { - LOG.warn("Failed to close proxy for peer {}, proxy class: ", - peer, proxy.getClass()); + LOG.warn("{}: Failed to close proxy for peer {}, proxy class: ", + name, peer, proxy.getClass()); } } }); } } + private volatile String name; private final Map<RaftPeerId, PeerAndProxy> peers = new ConcurrentHashMap<>(); private final Object resetLock = new Object(); @@ -82,16 +83,21 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { public PeerProxyMap(CheckedFunction<RaftPeer, PROXY, IOException> createProxy) { this.createProxy = createProxy; } + public PeerProxyMap() { this.createProxy = this::createProxyImpl; } + public void setName(String name) { + this.name = name; + } + public PROXY getProxy(RaftPeerId id) throws IOException { PeerAndProxy p = peers.get(id); if (p == null) { synchronized (resetLock) { p = Objects.requireNonNull(peers.get(id), - "Server " + id + " not found: peers=" + peers.keySet()); + () -> name + ": Server " + id + " not found: peers=" + peers.keySet()); } } return p.getProxy(); @@ -108,6 +114,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { } public void resetProxy(RaftPeerId id) { + LOG.debug("{}: reset proxy for {}", name, id ); synchronized (resetLock) { final PeerAndProxy pp = peers.remove(id); final RaftPeer peer = pp.getPeer(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index 8d3966e..9c76bfb 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; @RunWith(Parameterized.class) public class TestMultiRaftGroup extends BaseTest { static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE); + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 36a588e..4bd3fe0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -96,6 +96,7 @@ public class AppendStreamer implements Closeable { Collectors.toMap(RaftPeer::getId, Function.identity())); proxyMap = new PeerProxyMap<>( raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new)); + proxyMap.setName(clientId.toString()); proxyMap.addPeers(group.getPeers()); refreshLeaderProxy(leaderId, null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 2a27a75..23ac7b8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -17,8 +17,8 @@ */ package org.apache.ratis.grpc.client; -import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; @@ -29,7 +29,6 @@ import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.PeerProxyMap; -import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,17 +39,18 @@ import java.util.concurrent.ExecutionException; import static org.apache.ratis.client.impl.ClientProtoUtils.*; -public class GrpcClientRpc implements RaftClientRpc { +public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); - private final PeerProxyMap<RaftClientProtocolClient> proxies - = new PeerProxyMap<>(RaftClientProtocolClient::new); + public GrpcClientRpc() { + super(new PeerProxyMap<>(RaftClientProtocolClient::new)); + } @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); - final RaftClientProtocolClient proxy = proxies.getProxy(serverId); + final RaftClientProtocolClient proxy = getProxies().getProxy(serverId); if (request instanceof ReinitializeRequest) { RaftProtos.ReinitializeRequestProto proto = toReinitializeRequestProto((ReinitializeRequest) request); @@ -110,14 +110,4 @@ public class GrpcClientRpc implements RaftClientRpc { } } } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() throws IOException { - proxies.close(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java index 5d66480..a838416 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java @@ -55,6 +55,10 @@ public interface HadoopConfigKeys { HANDLERS_KEY, HANDLERS_DEFAULT, requireMin(1)); } + static void setHandlers(Configuration conf, int handers) { + set(conf::setInt, HANDLERS_KEY, handers); + } + static InetSocketAddress address(Configuration conf) { return getInetSocketAddress(conf::getTrimmed, ADDRESS_KEY, ADDRESS_DEFAULT); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java index b35d76b..83f4869 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java @@ -19,19 +19,16 @@ package org.apache.ratis.hadooprpc.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; -import org.apache.ratis.client.RaftClientRpc; +import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.protocol.*; import org.apache.ratis.util.PeerProxyMap; import java.io.IOException; -public class HadoopClientRpc implements RaftClientRpc { - - private final PeerProxyMap<CombinedClientProtocolClientSideTranslatorPB> proxies; - +public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtocolClientSideTranslatorPB> { public HadoopClientRpc(final Configuration conf) { - this.proxies = new PeerProxyMap<>( - p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); + super(new PeerProxyMap<>( + p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), conf))); } @Override @@ -39,7 +36,7 @@ public class HadoopClientRpc implements RaftClientRpc { throws IOException { final RaftPeerId serverId = request.getServerId(); final CombinedClientProtocolClientSideTranslatorPB proxy = - proxies.getProxy(serverId); + getProxies().getProxy(serverId); try { if (request instanceof ReinitializeRequest) { return proxy.reinitialize((ReinitializeRequest) request); @@ -60,14 +57,4 @@ public class HadoopClientRpc implements RaftClientRpc { GroupMismatchException.class); } } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() { - proxies.close(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 0a7b430..0512e8c 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -91,9 +91,10 @@ public class HadoopRpcService implements RaftServerRpc { private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies; private HadoopRpcService(RaftServer server, final Configuration conf) { + this.id = server.getId(); this.proxies = new PeerProxyMap<>( p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), conf)); - this.id = server.getId(); + this.proxies.setName(id.toString()); try { this.ipcServer = newRpcServer(server, conf); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 51b1d5d..44a1900 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -96,4 +96,9 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest, leaderId, delayMs, getMaxTimeout()); } + + @Override + public String toString() { + return getClass().getSimpleName() + ": sendServerRequest=" + sendServerRequest; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java index 124e7ee..1c21242 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java @@ -17,11 +17,11 @@ */ package org.apache.ratis.hadooprpc; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.log4j.Level; import org.apache.ratis.RaftBasicTests; -import org.apache.ratis.client.RaftClient; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.util.LogUtils; import org.junit.Test; @@ -31,16 +31,18 @@ import static org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServer public class TestRaftWithHadoopRpc extends RaftBasicTests { static { - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); } private final MiniRaftClusterWithHadoopRpc cluster; public TestRaftWithHadoopRpc() throws IOException { + final Configuration conf = new Configuration(); + HadoopConfigKeys.Ipc.setHandlers(conf, 20); + conf.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 1000); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000); cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster( - NUM_SERVERS, getProperties()); + NUM_SERVERS, getProperties(), conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index 31f127b..b53e94a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -23,7 +23,6 @@ import org.apache.ratis.netty.client.NettyClientRpc; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index 8a1af8a..1526725 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -17,8 +17,8 @@ */ package org.apache.ratis.netty.client; -import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.netty.NettyRpcProxy; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.proto.RaftProtos; @@ -30,13 +30,15 @@ import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestPro import java.io.IOException; -public class NettyClientRpc implements RaftClientRpc { - private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); +public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { + public NettyClientRpc() { + super(new NettyRpcProxy.PeerMap()); + } @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); - final NettyRpcProxy proxy = proxies.getProxy(serverId); + final NettyRpcProxy proxy = getProxies().getProxy(serverId); final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); final RaftRpcRequestProto rpcRequest; @@ -68,14 +70,4 @@ public class NettyClientRpc implements RaftClientRpc { proxy.send(rpcRequest, b.build()).getRaftClientReply()); } } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() { - proxies.close(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 3461254..6e9448a 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -76,7 +76,6 @@ public final class NettyRpcService implements RaftServerRpc { return new Builder(); } - private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); private final RaftServer server; private final RaftPeerId id; @@ -100,6 +99,7 @@ public final class NettyRpcService implements RaftServerRpc { private NettyRpcService(RaftServer server) { this.server = server; this.id = server.getId(); + this.proxies.setName(id.toString()); final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index e711774..a3974d5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -236,6 +236,7 @@ public class LeaderState { PendingRequest addPendingRequest(long index, RaftClientRequest request, TransactionContext entry) { + LOG.debug("{}: addPendingRequest at index={}, request={}", server.getId(), index, request); return pendingRequests.addPendingRequest(index, request, entry); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 13ef7e8..2525fb0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -407,7 +407,6 @@ public class RaftServerImpl implements RaftServerProtocol, private CompletableFuture<RaftClientReply> appendTransaction( RaftClientRequest request, TransactionContext context, RetryCache.CacheEntry cacheEntry) throws IOException { - LOG.debug("{}: receive client request({})", getId(), request); assertLifeCycleState(RUNNING); CompletableFuture<RaftClientReply> reply; @@ -441,6 +440,7 @@ public class RaftServerImpl implements RaftServerProtocol, @Override public CompletableFuture<RaftClientReply> submitClientRequestAsync( RaftClientRequest request) throws IOException { + LOG.debug("{}: receive client request({})", getId(), request); // first check the server's leader state CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null); if (reply != null) { @@ -690,7 +690,7 @@ public class RaftServerImpl implements RaftServerProtocol, static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) { if (isHeartbeat) { if (LOG.isTraceEnabled()) { - LOG.trace(message.get()); + LOG.trace("HEARTBEAT: " + message.get()); } } else { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 0595027..0a16954 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -160,6 +160,7 @@ public class RaftServerProxy implements RaftServer { @Override public CompletableFuture<RaftClientReply> reinitializeAsync( ReinitializeRequest request) throws IOException { + LOG.info("{}: reinitializeAsync {}", getId(), request); getImpl().assertGroup(request.getRequestorId(), request.getRaftGroupId()); if (!reinitializeRequest.compareAndSet(null, request)) { throw new IOException("Another reinitialize is already in progress."); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 517af66..4423fdb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -19,6 +19,7 @@ package org.apache.ratis; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftGroup; @@ -28,6 +29,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.storage.MemoryRaftLog; @@ -75,7 +77,11 @@ public abstract class MiniRaftCluster { @Override public void setBlockRequestsFrom(String src, boolean block) { - RaftTestUtil.setBlockRequestsFrom(src, block); + if (block) { + BlockRequestHandlingInjection.getInstance().blockRequestor(src); + } else { + BlockRequestHandlingInjection.getInstance().unblockRequestor(src); + } } public static int getPort(RaftPeerId id, RaftGroup group) { @@ -430,12 +436,15 @@ public abstract class MiniRaftCluster { } public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) { - return RaftClient.newBuilder() + final RaftClientRpc rpc = clientFactory.newRaftClientRpc(); + final RaftClient client = RaftClient.newBuilder() .setRaftGroup(group) .setLeaderId(leaderId) - .setClientRpc(clientFactory.newRaftClientRpc()) + .setClientRpc(rpc) .setProperties(properties) .build(); + rpc.setName(client.getId().toString()); + return client; } public void shutdown() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 1c7ccd5..5464d4b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -17,24 +17,42 @@ */ package org.apache.ratis; +import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; -import org.junit.*; - -import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; -import static org.apache.ratis.RaftTestUtil.waitForLeader; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; +import static org.apache.ratis.RaftTestUtil.waitForLeader; + public abstract class RaftBasicTests extends BaseTest { + { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + public static final int NUM_SERVERS = 5; protected static final RaftProperties properties = new RaftProperties(); @@ -105,33 +123,52 @@ public abstract class RaftBasicTests extends BaseTest { waitForLeader(cluster, leader); } - static class Client4TestWithLoad extends Thread { - final RaftClient client; + class Client4TestWithLoad extends Thread { + final int index; final SimpleMessage[] messages; + final AtomicBoolean isRunning = new AtomicBoolean(true); final AtomicInteger step = new AtomicInteger(); - volatile Exception exceptionInClientThread; + final AtomicReference<Throwable> exceptionInClientThread = new AtomicReference<>(); - Client4TestWithLoad(RaftClient client, int numMessages) { - this.client = client; - this.messages = SimpleMessage.create(numMessages, client.getId().toString()); + Client4TestWithLoad(int index, int numMessages) { + super("client-" + index); + this.index = index; + this.messages = SimpleMessage.create(numMessages, index + "-"); } boolean isRunning() { - return step.get() < messages.length && exceptionInClientThread == null; + return isRunning.get(); } @Override public void run() { - try { - for (; isRunning(); ) { - client.send(messages[step.getAndIncrement()]); + try(RaftClient client = getCluster().createClient()) { + for (; step.get() < messages.length; ) { + final RaftClientReply reply = client.send(messages[step.getAndIncrement()]); + Assert.assertTrue(reply.isSuccess()); + } + } catch(Throwable t) { + if (exceptionInClientThread.compareAndSet(null, t)) { + LOG.error(this + " failed", t); + } else { + exceptionInClientThread.get().addSuppressed(t); + LOG.error(this + " failed again!", t); } - client.close(); - } catch (IOException ioe) { - exceptionInClientThread = ioe; + } finally { + isRunning.set(false); } } + + @Override + public String toString() { + return getClass().getSimpleName() + index + + "(step=" + step + "/" + messages.length + + ", isRunning=" + isRunning + + ", isAlive=" + isAlive() + + ", exception=" + exceptionInClientThread + + ")"; + } } @Test @@ -149,23 +186,56 @@ public abstract class RaftBasicTests extends BaseTest { final List<Client4TestWithLoad> clients = Stream.iterate(0, i -> i+1).limit(numClients) - .map(i -> cluster.createClient()) - .map(c -> new Client4TestWithLoad(c, numMessages)) + .map(i -> new Client4TestWithLoad(i, numMessages)) .collect(Collectors.toList()); + final AtomicInteger lastStep = new AtomicInteger(); + + final Timer timer = new Timer(); + timer.schedule(new TimerTask() { + private int previousLastStep = lastStep.get(); + + @Override + public void run() { + LOG.info(cluster.printServers()); + LOG.info(BlockRequestHandlingInjection.getInstance().toString()); + LOG.info(cluster.toString()); + clients.forEach(c -> LOG.info(" " + c)); + JavaUtils.dumpAllThreads(s -> LOG.info(s)); + + final int last = lastStep.get(); + if (last != previousLastStep) { + previousLastStep = last; + } else { + final RaftServerImpl leader = cluster.getLeader(); + LOG.info("NO PROGRESS at " + last + ", try to restart leader=" + leader); + if (leader != null) { + try { + cluster.restartServer(leader.getId(), false); + LOG.info("Restarted leader=" + leader); + } catch (IOException e) { + LOG.error("Failed to restart leader=" + leader); + } + } + } + } + }, 5_000L, 10_000L); + clients.forEach(Thread::start); int count = 0; - for(int lastStep = 0;; ) { + for(;; ) { if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 0) { break; } final int n = clients.stream().mapToInt(c -> c.step.get()).sum(); - if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps. + Assert.assertTrue(n >= lastStep.get()); + + if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 steps. Thread.sleep(10); continue; } - lastStep = n; + lastStep.set(n); count++; RaftServerImpl leader = cluster.getLeader(); @@ -173,17 +243,14 @@ public abstract class RaftBasicTests extends BaseTest { RaftTestUtil.changeLeader(cluster, leader.getId()); } } + LOG.info("Leader change count=" + count); + timer.cancel(); for(Client4TestWithLoad c : clients) { - c.join(); - } - for(Client4TestWithLoad c : clients) { - if (c.exceptionInClientThread != null) { - throw new AssertionError(c.exceptionInClientThread); + if (c.exceptionInClientThread.get() != null) { + throw new AssertionError(c.exceptionInClientThread.get()); } RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages); } - - LOG.info("Leader change count=" + count + cluster.printAllLogs()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 65cb5fa..4fe9edc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -297,12 +297,4 @@ public interface RaftTestUtil { Thread.sleep(3 * maxTimeout); } - - static void setBlockRequestsFrom(String src, boolean block) { - if (block) { - BlockRequestHandlingInjection.getInstance().blockRequestor(src); - } else { - BlockRequestHandlingInjection.getInstance().unblockRequestor(src); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java index 3870e82..abe9dd5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java @@ -45,10 +45,12 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co private BlockRequestHandlingInjection() {} public void blockRequestor(String requestor) { + LOG.info("Block requestor " + requestor); requestors.put(requestor, true); } public void unblockRequestor(String requestor) { + LOG.info("UnBlock requestor " + requestor); requestors.remove(requestor); } @@ -67,20 +69,29 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co @Override public boolean execute(Object localId, Object remoteId, Object... args) { - if (shouldBlock(localId, remoteId)) { - try { - RaftTestUtil.block(() -> shouldBlock(localId, remoteId)); - return true; - } catch (InterruptedException e) { - LOG.debug("Interrupted while blocking request handling from " + remoteId - + " to " + localId); - } + if (!shouldBlock(localId, remoteId)) { + return false; } - return false; + + LOG.info(localId + ": Block request from " + remoteId); + try { + RaftTestUtil.block(() -> shouldBlock(localId, remoteId)); + } catch (InterruptedException e) { + LOG.debug("Interrupted while blocking request from " + remoteId + " to " + localId, e); + } + LOG.info(localId + ": unBlock request from " + remoteId); + return true; } private boolean shouldBlock(Object localId, Object remoteId) { return (localId != null && repliers.containsKey(localId.toString())) || (remoteId != null && requestors.containsKey(remoteId.toString())); } + + @Override + public String toString() { + return getClass().getSimpleName() + + ": requestors=" + requestors.keySet() + + ", repliers=" + repliers.keySet(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java index 6df6176..410228f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java @@ -70,4 +70,9 @@ public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Cod } return true; } + + @Override + public String toString() { + return getClass().getSimpleName() + ": delays=" + delays; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java index a62ec16..cb28b3c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java @@ -21,6 +21,7 @@ import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; class SimulatedClientRpc extends SimulatedRequestReply<RaftClientRequest, RaftClientReply> @@ -30,11 +31,21 @@ class SimulatedClientRpc } @Override + public void setName(String name) { + // do nothing + } + + @Override public void addServers(Iterable<RaftPeer> servers) { // do nothing } @Override + public void handleException(RaftPeerId serverId, Exception e) { + // do nothing + } + + @Override public void close() { // do nothing }
