Repository: incubator-ratis
Updated Branches:
  refs/heads/master 298c1a2de -> a783abd0d


RATIS-114. TestRaftWithHadoopRpc.testWithLoad may timeout. Contributed by Tsz 
Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a783abd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a783abd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a783abd0

Branch: refs/heads/master
Commit: a783abd0d253d2082c5db6ed8ef485486ebb3c7a
Parents: 298c1a2
Author: Jing Zhao <[email protected]>
Authored: Fri Sep 15 11:58:21 2017 -0700
Committer: Jing Zhao <[email protected]>
Committed: Fri Sep 15 11:58:21 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClientRpc.java  |   7 ++
 .../ratis/client/impl/RaftClientImpl.java       |   6 +-
 .../client/impl/RaftClientRpcWithProxy.java     |  68 ++++++++++
 .../ratis/protocol/RaftClientMessage.java       |   2 +-
 .../ratis/protocol/RaftClientRequest.java       |   2 +-
 .../java/org/apache/ratis/util/JavaUtils.java   |   9 ++
 .../org/apache/ratis/util/PeerProxyMap.java     |  13 +-
 .../org/apache/ratis/TestMultiRaftGroup.java    |   2 +-
 .../ratis/grpc/client/AppendStreamer.java       |   1 +
 .../apache/ratis/grpc/client/GrpcClientRpc.java |  22 +---
 .../ratis/hadooprpc/HadoopConfigKeys.java       |   4 +
 .../ratis/hadooprpc/client/HadoopClientRpc.java |  23 +---
 .../hadooprpc/server/HadoopRpcService.java      |   3 +-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |   5 +
 .../ratis/hadooprpc/TestRaftWithHadoopRpc.java  |  12 +-
 .../org/apache/ratis/netty/NettyFactory.java    |   1 -
 .../ratis/netty/client/NettyClientRpc.java      |  20 +--
 .../ratis/netty/server/NettyRpcService.java     |   2 +-
 .../apache/ratis/server/impl/LeaderState.java   |   1 +
 .../ratis/server/impl/RaftServerImpl.java       |   4 +-
 .../ratis/server/impl/RaftServerProxy.java      |   1 +
 .../java/org/apache/ratis/MiniRaftCluster.java  |  15 ++-
 .../java/org/apache/ratis/RaftBasicTests.java   | 125 ++++++++++++++-----
 .../java/org/apache/ratis/RaftTestUtil.java     |   8 --
 .../impl/BlockRequestHandlingInjection.java     |  29 +++--
 .../impl/DelayLocalExecutionInjection.java      |   5 +
 .../server/simulation/SimulatedClientRpc.java   |  11 ++
 27 files changed, 286 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index ca1864b..afe058c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -23,12 +23,19 @@ import java.io.IOException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 
 /** The client side rpc of a raft service. */
 public interface RaftClientRpc extends Closeable {
+  /** Set the client name */
+  void setName(String name);
+
   /** Send a request. */
   RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
 
   /** Add the information of the given raft servers */
   void addServers(Iterable<RaftPeer> servers);
+
+  /** Handle the given exception.  For example, try reconnecting. */
+  void handleException(RaftPeerId serverId, Exception e);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 30baee6..8a0ddef 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -147,7 +147,7 @@ final class RaftClientImpl implements RaftClient {
 
   private RaftClientReply sendRequest(RaftClientRequest request)
       throws StateMachineException, GroupMismatchException {
-    LOG.debug("{}: {}", clientId, request);
+    LOG.debug("{}: send {}", clientId, request);
     RaftClientReply reply = null;
     try {
       reply = clientRpc.sendRequest(request);
@@ -157,7 +157,7 @@ final class RaftClientImpl implements RaftClient {
       handleIOException(request, ioe, null);
     }
     if (reply != null) {
-      LOG.debug("{}: {}", clientId, reply);
+      LOG.debug("{}: receive {}", clientId, reply);
       if (reply.isNotLeader()) {
         handleNotLeaderException(request, reply.getNotLeaderException());
         return null;
@@ -196,6 +196,8 @@ final class RaftClientImpl implements RaftClient {
     }
 
     final RaftPeerId oldLeader = request.getServerId();
+    clientRpc.handleException(oldLeader, ioe);
+
     if (newLeader == null && oldLeader.equals(leaderId)) {
       newLeader = CollectionUtils.random(oldLeader,
           CollectionUtils.as(peers, RaftPeer::getId));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
new file mode 100644
index 0000000..4294249
--- /dev/null
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.ReflectionUtils;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+
+/** An abstract {@link RaftClientRpc} implementation using {@link 
PeerProxyMap}. */
+public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
+    implements RaftClientRpc {
+  private final PeerProxyMap<PROXY> proxies;
+
+  protected RaftClientRpcWithProxy(PeerProxyMap<PROXY> proxies) {
+    this.proxies = proxies;
+  }
+
+  public PeerProxyMap<PROXY> getProxies() {
+    return proxies;
+  }
+
+  @Override
+  public void setName(String name) {
+    proxies.setName(name);
+  }
+
+  @Override
+  public void addServers(Iterable<RaftPeer> servers) {
+    proxies.addPeers(servers);
+  }
+
+  @Override
+  public void handleException(RaftPeerId serverId, Exception e) {
+    if (ReflectionUtils.isInstance(e,
+        SocketException.class, SocketTimeoutException.class,
+        ClosedChannelException.class, EOFException.class)) {
+      proxies.resetProxy(serverId);
+    }
+  }
+
+  @Override
+  public void close() {
+    proxies.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index 49f2b6f..07354d4 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -55,6 +55,6 @@ public abstract class RaftClientMessage implements 
RaftRpcMessage {
   @Override
   public String toString() {
     return getClass().getSimpleName() + "(" + clientId + "->" + serverId
-        + ") in raft group " + groupId;
+        + ") in " + groupId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 41bdb2e..63e482d 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -58,6 +58,6 @@ public class RaftClientRequest extends RaftClientMessage {
   @Override
   public String toString() {
     return super.toString() + ", callId: " + callId + ", "
-        + (isReadOnly()? "RO": "RW");
+        + (isReadOnly()? "RO": "RW") + ", " + getMessage();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 9d4a6e5..ed115cb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -23,6 +23,9 @@ package org.apache.ratis.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -168,4 +171,10 @@ public interface JavaUtils {
     }
   }
 
+  static void dumpAllThreads(Consumer<String> println) {
+    final ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+    for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
+      println.accept(ti.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 3319a6a..856e98e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -66,14 +66,15 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
           try {
             proxy.close();
           } catch (IOException e) {
-            LOG.warn("Failed to close proxy for peer {}, proxy class: ",
-                peer, proxy.getClass());
+            LOG.warn("{}: Failed to close proxy for peer {}, proxy class: ",
+                name, peer, proxy.getClass());
           }
         }
       });
     }
   }
 
+  private volatile String name;
   private final Map<RaftPeerId, PeerAndProxy> peers = new 
ConcurrentHashMap<>();
   private final Object resetLock = new Object();
 
@@ -82,16 +83,21 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
   public PeerProxyMap(CheckedFunction<RaftPeer, PROXY, IOException> 
createProxy) {
     this.createProxy = createProxy;
   }
+
   public PeerProxyMap() {
     this.createProxy = this::createProxyImpl;
   }
 
+  public void setName(String name) {
+    this.name = name;
+  }
+
   public PROXY getProxy(RaftPeerId id) throws IOException {
     PeerAndProxy p = peers.get(id);
     if (p == null) {
       synchronized (resetLock) {
         p = Objects.requireNonNull(peers.get(id),
-            "Server " + id + " not found: peers=" + peers.keySet());
+            () -> name + ": Server " + id + " not found: peers=" + 
peers.keySet());
       }
     }
     return p.getProxy();
@@ -108,6 +114,7 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
   }
 
   public void resetProxy(RaftPeerId id) {
+    LOG.debug("{}: reset proxy for {}", name, id );
     synchronized (resetLock) {
       final PeerAndProxy pp = peers.remove(id);
       final RaftPeer peer = pp.getPeer();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java 
b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
index 8d3966e..9c76bfb 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RunWith(Parameterized.class)
 public class TestMultiRaftGroup extends BaseTest {
   static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.TRACE);
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
   }
 
   @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 36a588e..4bd3fe0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -96,6 +96,7 @@ public class AppendStreamer implements Closeable {
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(
         raftPeer -> new RaftClientProtocolProxy(raftPeer, 
ResponseHandler::new));
+    proxyMap.setName(clientId.toString());
     proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 2a27a75..23ac7b8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -17,8 +17,8 @@
  */
 package org.apache.ratis.grpc.client;
 
-import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
@@ -29,7 +29,6 @@ import 
org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.PeerProxyMap;
-import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,17 +39,18 @@ import java.util.concurrent.ExecutionException;
 
 import static org.apache.ratis.client.impl.ClientProtoUtils.*;
 
-public class GrpcClientRpc implements RaftClientRpc {
+public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClient> {
   public static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientRpc.class);
 
-  private final PeerProxyMap<RaftClientProtocolClient> proxies
-      = new PeerProxyMap<>(RaftClientProtocolClient::new);
+  public GrpcClientRpc() {
+    super(new PeerProxyMap<>(RaftClientProtocolClient::new));
+  }
 
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
-    final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
+    final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
     if (request instanceof ReinitializeRequest) {
       RaftProtos.ReinitializeRequestProto proto =
           toReinitializeRequestProto((ReinitializeRequest) request);
@@ -110,14 +110,4 @@ public class GrpcClientRpc implements RaftClientRpc {
       }
     }
   }
-
-  @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
-  }
-
-  @Override
-  public void close() throws IOException {
-    proxies.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
index 5d66480..a838416 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
@@ -55,6 +55,10 @@ public interface HadoopConfigKeys {
           HANDLERS_KEY, HANDLERS_DEFAULT, requireMin(1));
     }
 
+    static void setHandlers(Configuration conf, int handers) {
+      set(conf::setInt, HANDLERS_KEY, handers);
+    }
+
     static InetSocketAddress address(Configuration conf) {
       return getInetSocketAddress(conf::getTrimmed,
           ADDRESS_KEY, ADDRESS_DEFAULT);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index b35d76b..83f4869 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -19,19 +19,16 @@ package org.apache.ratis.hadooprpc.client;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.util.PeerProxyMap;
 
 import java.io.IOException;
 
-public class HadoopClientRpc implements RaftClientRpc {
-
-  private final PeerProxyMap<CombinedClientProtocolClientSideTranslatorPB> 
proxies;
-
+public class HadoopClientRpc extends 
RaftClientRpcWithProxy<CombinedClientProtocolClientSideTranslatorPB> {
   public HadoopClientRpc(final Configuration conf) {
-    this.proxies  = new PeerProxyMap<>(
-        p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), 
conf));
+    super(new PeerProxyMap<>(
+        p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), 
conf)));
   }
 
   @Override
@@ -39,7 +36,7 @@ public class HadoopClientRpc implements RaftClientRpc {
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
     final CombinedClientProtocolClientSideTranslatorPB proxy =
-        proxies.getProxy(serverId);
+        getProxies().getProxy(serverId);
     try {
       if (request instanceof ReinitializeRequest) {
         return proxy.reinitialize((ReinitializeRequest) request);
@@ -60,14 +57,4 @@ public class HadoopClientRpc implements RaftClientRpc {
           GroupMismatchException.class);
     }
   }
-
-  @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
-  }
-
-  @Override
-  public void close() {
-    proxies.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 0a7b430..0512e8c 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -91,9 +91,10 @@ public class HadoopRpcService implements RaftServerRpc {
   private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies;
 
   private HadoopRpcService(RaftServer server, final Configuration conf) {
+    this.id = server.getId();
     this.proxies = new PeerProxyMap<>(
         p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), conf));
-    this.id = server.getId();
+    this.proxies.setName(id.toString());
     try {
       this.ipcServer = newRpcServer(server, conf);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index 51b1d5d..44a1900 100644
--- 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -96,4 +96,9 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
     RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,
         leaderId, delayMs, getMaxTimeout());
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ": sendServerRequest=" + 
sendServerRequest;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
index 124e7ee..1c21242 100644
--- 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
@@ -17,11 +17,11 @@
  */
 package org.apache.ratis.hadooprpc;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.log4j.Level;
 import org.apache.ratis.RaftBasicTests;
-import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.util.LogUtils;
 import org.junit.Test;
 
@@ -31,16 +31,18 @@ import static 
org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServer
 
 public class TestRaftWithHadoopRpc extends RaftBasicTests {
   static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
     LogUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
   }
 
   private final MiniRaftClusterWithHadoopRpc cluster;
 
   public TestRaftWithHadoopRpc() throws IOException {
+    final Configuration conf = new Configuration();
+    HadoopConfigKeys.Ipc.setHandlers(conf, 20);
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, 
1000);
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
     cluster = MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(
-        NUM_SERVERS, getProperties());
+        NUM_SERVERS, getProperties(), conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 31f127b..b53e94a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -23,7 +23,6 @@ import org.apache.ratis.netty.client.NettyClientRpc;
 import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
 public class NettyFactory extends ServerFactory.BaseFactory implements 
ClientFactory {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 8a1af8a..1526725 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -17,8 +17,8 @@
  */
 package org.apache.ratis.netty.client;
 
-import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.netty.NettyRpcProxy;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.proto.RaftProtos;
@@ -30,13 +30,15 @@ import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestPro
 
 import java.io.IOException;
 
-public class NettyClientRpc implements RaftClientRpc {
-  private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
+public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+  public NettyClientRpc() {
+    super(new NettyRpcProxy.PeerMap());
+  }
 
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
     final RaftPeerId serverId = request.getServerId();
-    final NettyRpcProxy proxy = proxies.getProxy(serverId);
+    final NettyRpcProxy proxy = getProxies().getProxy(serverId);
 
     final RaftNettyServerRequestProto.Builder b = 
RaftNettyServerRequestProto.newBuilder();
     final RaftRpcRequestProto rpcRequest;
@@ -68,14 +70,4 @@ public class NettyClientRpc implements RaftClientRpc {
           proxy.send(rpcRequest, b.build()).getRaftClientReply());
     }
   }
-
-  @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
-  }
-
-  @Override
-  public void close() {
-    proxies.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 3461254..6e9448a 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -76,7 +76,6 @@ public final class NettyRpcService implements RaftServerRpc {
     return new Builder();
   }
 
-
   private final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
   private final RaftServer server;
   private final RaftPeerId id;
@@ -100,6 +99,7 @@ public final class NettyRpcService implements RaftServerRpc {
   private NettyRpcService(RaftServer server) {
     this.server = server;
     this.id = server.getId();
+    this.proxies.setName(id.toString());
 
     final ChannelInitializer<SocketChannel> initializer
         = new ChannelInitializer<SocketChannel>() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e711774..a3974d5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -236,6 +236,7 @@ public class LeaderState {
 
   PendingRequest addPendingRequest(long index, RaftClientRequest request,
       TransactionContext entry) {
+    LOG.debug("{}: addPendingRequest at index={}, request={}", server.getId(), 
index, request);
     return pendingRequests.addPendingRequest(index, request, entry);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 13ef7e8..2525fb0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -407,7 +407,6 @@ public class RaftServerImpl implements RaftServerProtocol,
   private CompletableFuture<RaftClientReply> appendTransaction(
       RaftClientRequest request, TransactionContext context,
       RetryCache.CacheEntry cacheEntry) throws IOException {
-    LOG.debug("{}: receive client request({})", getId(), request);
     assertLifeCycleState(RUNNING);
     CompletableFuture<RaftClientReply> reply;
 
@@ -441,6 +440,7 @@ public class RaftServerImpl implements RaftServerProtocol,
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
+    LOG.debug("{}: receive client request({})", getId(), request);
     // first check the server's leader state
     CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
     if (reply != null) {
@@ -690,7 +690,7 @@ public class RaftServerImpl implements RaftServerProtocol,
   static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
     if (isHeartbeat) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(message.get());
+        LOG.trace("HEARTBEAT: " + message.get());
       }
     } else {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0595027..0a16954 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -160,6 +160,7 @@ public class RaftServerProxy implements RaftServer {
   @Override
   public CompletableFuture<RaftClientReply> reinitializeAsync(
       ReinitializeRequest request) throws IOException {
+    LOG.info("{}: reinitializeAsync {}", getId(), request);
     getImpl().assertGroup(request.getRequestorId(), request.getRaftGroupId());
     if (!reinitializeRequest.compareAndSet(null, request)) {
       throw new IOException("Another reinitialize is already in progress.");

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 517af66..4423fdb 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -19,6 +19,7 @@ package org.apache.ratis;
 
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
@@ -28,6 +29,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.storage.MemoryRaftLog;
@@ -75,7 +77,11 @@ public abstract class MiniRaftCluster {
 
     @Override
     public void setBlockRequestsFrom(String src, boolean block) {
-      RaftTestUtil.setBlockRequestsFrom(src, block);
+      if (block) {
+        BlockRequestHandlingInjection.getInstance().blockRequestor(src);
+      } else {
+        BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
+      }
     }
 
     public static int getPort(RaftPeerId id, RaftGroup group) {
@@ -430,12 +436,15 @@ public abstract class MiniRaftCluster {
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
-    return RaftClient.newBuilder()
+    final RaftClientRpc rpc = clientFactory.newRaftClientRpc();
+    final RaftClient client = RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leaderId)
-        .setClientRpc(clientFactory.newRaftClientRpc())
+        .setClientRpc(rpc)
         .setProperties(properties)
         .build();
+    rpc.setName(client.getId().toString());
+    return client;
   }
 
   public void shutdown() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 1c7ccd5..5464d4b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -17,24 +17,42 @@
  */
 package org.apache.ratis;
 
+import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.junit.*;
-
-import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
-import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
 public abstract class RaftBasicTests extends BaseTest {
+  {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
   public static final int NUM_SERVERS = 5;
 
   protected static final RaftProperties properties = new RaftProperties();
@@ -105,33 +123,52 @@ public abstract class RaftBasicTests extends BaseTest {
     waitForLeader(cluster, leader);
   }
 
-  static class Client4TestWithLoad extends Thread {
-    final RaftClient client;
+  class Client4TestWithLoad extends Thread {
+    final int index;
     final SimpleMessage[] messages;
 
+    final AtomicBoolean isRunning = new AtomicBoolean(true);
     final AtomicInteger step = new AtomicInteger();
-    volatile Exception exceptionInClientThread;
+    final AtomicReference<Throwable> exceptionInClientThread = new 
AtomicReference<>();
 
-    Client4TestWithLoad(RaftClient client, int numMessages) {
-      this.client = client;
-      this.messages = SimpleMessage.create(numMessages, 
client.getId().toString());
+    Client4TestWithLoad(int index, int numMessages) {
+      super("client-" + index);
+      this.index = index;
+      this.messages = SimpleMessage.create(numMessages, index + "-");
     }
 
     boolean isRunning() {
-      return step.get() < messages.length && exceptionInClientThread == null;
+      return isRunning.get();
     }
 
     @Override
     public void run() {
-      try {
-        for (; isRunning(); ) {
-          client.send(messages[step.getAndIncrement()]);
+      try(RaftClient client = getCluster().createClient()) {
+        for (; step.get() < messages.length; ) {
+          final RaftClientReply reply = 
client.send(messages[step.getAndIncrement()]);
+          Assert.assertTrue(reply.isSuccess());
+        }
+      } catch(Throwable t) {
+        if (exceptionInClientThread.compareAndSet(null, t)) {
+          LOG.error(this + " failed", t);
+        } else {
+          exceptionInClientThread.get().addSuppressed(t);
+          LOG.error(this + " failed again!", t);
         }
-        client.close();
-      } catch (IOException ioe) {
-        exceptionInClientThread = ioe;
+      } finally {
+        isRunning.set(false);
       }
     }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + index
+          + "(step=" + step + "/" + messages.length
+          + ", isRunning=" + isRunning
+          + ", isAlive=" + isAlive()
+          + ", exception=" + exceptionInClientThread
+          + ")";
+    }
   }
 
   @Test
@@ -149,23 +186,56 @@ public abstract class RaftBasicTests extends BaseTest {
 
     final List<Client4TestWithLoad> clients
         = Stream.iterate(0, i -> i+1).limit(numClients)
-        .map(i -> cluster.createClient())
-        .map(c -> new Client4TestWithLoad(c, numMessages))
+        .map(i -> new Client4TestWithLoad(i, numMessages))
         .collect(Collectors.toList());
+    final AtomicInteger lastStep = new AtomicInteger();
+
+    final Timer timer = new Timer();
+    timer.schedule(new TimerTask() {
+      private int previousLastStep = lastStep.get();
+
+      @Override
+      public void run() {
+        LOG.info(cluster.printServers());
+        LOG.info(BlockRequestHandlingInjection.getInstance().toString());
+        LOG.info(cluster.toString());
+        clients.forEach(c -> LOG.info("  " + c));
+        JavaUtils.dumpAllThreads(s -> LOG.info(s));
+
+        final int last = lastStep.get();
+        if (last != previousLastStep) {
+          previousLastStep = last;
+        } else {
+          final RaftServerImpl leader = cluster.getLeader();
+          LOG.info("NO PROGRESS at " + last + ", try to restart leader=" + 
leader);
+          if (leader != null) {
+            try {
+              cluster.restartServer(leader.getId(), false);
+              LOG.info("Restarted leader=" + leader);
+            } catch (IOException e) {
+              LOG.error("Failed to restart leader=" + leader);
+            }
+          }
+        }
+      }
+    }, 5_000L, 10_000L);
+
     clients.forEach(Thread::start);
 
     int count = 0;
-    for(int lastStep = 0;; ) {
+    for(;; ) {
       if (clients.stream().filter(Client4TestWithLoad::isRunning).count() == 
0) {
         break;
       }
 
       final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
-      if (n - lastStep < 50 * numClients) { // Change leader at least 50 steps.
+      Assert.assertTrue(n >= lastStep.get());
+
+      if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 
steps.
         Thread.sleep(10);
         continue;
       }
-      lastStep = n;
+      lastStep.set(n);
       count++;
 
       RaftServerImpl leader = cluster.getLeader();
@@ -173,17 +243,14 @@ public abstract class RaftBasicTests extends BaseTest {
         RaftTestUtil.changeLeader(cluster, leader.getId());
       }
     }
+    LOG.info("Leader change count=" + count);
+    timer.cancel();
 
     for(Client4TestWithLoad c : clients) {
-      c.join();
-    }
-    for(Client4TestWithLoad c : clients) {
-      if (c.exceptionInClientThread != null) {
-        throw new AssertionError(c.exceptionInClientThread);
+      if (c.exceptionInClientThread.get() != null) {
+        throw new AssertionError(c.exceptionInClientThread.get());
       }
       RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
     }
-
-    LOG.info("Leader change count=" + count + cluster.printAllLogs());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 65cb5fa..4fe9edc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -297,12 +297,4 @@ public interface RaftTestUtil {
 
     Thread.sleep(3 * maxTimeout);
   }
-
-  static void setBlockRequestsFrom(String src, boolean block) {
-    if (block) {
-      BlockRequestHandlingInjection.getInstance().blockRequestor(src);
-    } else {
-      BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
index 3870e82..abe9dd5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
@@ -45,10 +45,12 @@ public class BlockRequestHandlingInjection implements 
CodeInjectionForTesting.Co
   private BlockRequestHandlingInjection() {}
 
   public void blockRequestor(String requestor) {
+    LOG.info("Block requestor " + requestor);
     requestors.put(requestor, true);
   }
 
   public void unblockRequestor(String requestor) {
+    LOG.info("UnBlock requestor " + requestor);
     requestors.remove(requestor);
   }
 
@@ -67,20 +69,29 @@ public class BlockRequestHandlingInjection implements 
CodeInjectionForTesting.Co
 
   @Override
   public boolean execute(Object localId, Object remoteId, Object... args) {
-    if (shouldBlock(localId, remoteId)) {
-      try {
-        RaftTestUtil.block(() -> shouldBlock(localId, remoteId));
-        return true;
-      } catch (InterruptedException e) {
-        LOG.debug("Interrupted while blocking request handling from " + 
remoteId
-            + " to " + localId);
-      }
+    if (!shouldBlock(localId, remoteId)) {
+      return false;
     }
-    return false;
+
+    LOG.info(localId + ": Block request from " + remoteId);
+    try {
+      RaftTestUtil.block(() -> shouldBlock(localId, remoteId));
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while blocking request from " + remoteId + " to " 
+ localId, e);
+    }
+    LOG.info(localId + ": unBlock request from " + remoteId);
+    return true;
   }
 
   private boolean shouldBlock(Object localId, Object remoteId) {
     return (localId != null && repliers.containsKey(localId.toString())) ||
         (remoteId != null && requestors.containsKey(remoteId.toString()));
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName()
+        + ": requestors=" + requestors.keySet()
+        + ", repliers=" + repliers.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
index 6df6176..410228f 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
@@ -70,4 +70,9 @@ public class DelayLocalExecutionInjection implements 
CodeInjectionForTesting.Cod
     }
     return true;
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ": delays=" + delays;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a783abd0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index a62ec16..cb28b3c 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -21,6 +21,7 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 
 class SimulatedClientRpc
     extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
@@ -30,11 +31,21 @@ class SimulatedClientRpc
   }
 
   @Override
+  public void setName(String name) {
+    // do nothing
+  }
+
+  @Override
   public void addServers(Iterable<RaftPeer> servers) {
     // do nothing
   }
 
   @Override
+  public void handleException(RaftPeerId serverId, Exception e) {
+    // do nothing
+  }
+
+  @Override
   public void close() {
     // do nothing
   }


Reply via email to