http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
new file mode 100644
index 0000000..910ec6e
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.util.ProtoUtils.toByteString;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.BooleanSupplier;
+import java.util.function.IntSupplier;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.util.CheckedRunnable;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class RaftTestUtil {
+  static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
+
+  public static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
+      throws InterruptedException {
+    final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1;
+    LOG.info(cluster.printServers());
+    RaftServerImpl leader = null;
+    for(int i = 0; leader == null && i < 10; i++) {
+      Thread.sleep(sleepTime);
+      leader = cluster.getLeader();
+    }
+    LOG.info(cluster.printServers());
+    return leader;
+  }
+
+  public static RaftServerImpl waitForLeader(MiniRaftCluster cluster,
+                                             final String leaderId) throws 
InterruptedException {
+    LOG.info(cluster.printServers());
+    for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) {
+      RaftServerImpl currLeader = cluster.getLeader();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("try enforcing leader to " + leaderId + " but "
+            + (currLeader == null? "no leader for this round"
+                : "new leader is " + currLeader.getId()));
+      }
+    }
+    LOG.info(cluster.printServers());
+
+    final RaftServerImpl leader = cluster.getLeader();
+    Assert.assertEquals(leaderId, leader.getId());
+    return leader;
+  }
+
+  public static String waitAndKillLeader(MiniRaftCluster cluster,
+      boolean expectLeader) throws InterruptedException {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    if (!expectLeader) {
+      Assert.assertNull(leader);
+    } else {
+      Assert.assertNotNull(leader);
+      LOG.info("killing leader = " + leader);
+      cluster.killServer(leader.getId());
+    }
+    return leader != null ? leader.getId() : null;
+  }
+
+  public static boolean logEntriesContains(LogEntryProto[] entries,
+      SimpleMessage... expectedMessages) {
+    int idxEntries = 0;
+    int idxExpected = 0;
+    while (idxEntries < entries.length
+        && idxExpected < expectedMessages.length) {
+      if 
(Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
+          entries[idxEntries].getSmLogEntry().getData().toByteArray())) {
+        ++idxExpected;
+      }
+      ++idxEntries;
+    }
+    return idxExpected == expectedMessages.length;
+  }
+
+  public static void assertLogEntries(Collection<RaftServerImpl> servers,
+                                      SimpleMessage... expectedMessages) {
+    final int size = servers.size();
+    final long count = servers.stream()
+        .filter(RaftServerImpl::isAlive)
+        .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE))
+        .filter(e -> logEntriesContains(e, expectedMessages))
+        .count();
+    if (2*count <= size) {
+      throw new AssertionError("Not in majority: size=" + size
+          + " but count=" + count);
+    }
+  }
+
+  public static void assertLogEntries(LogEntryProto[] entries, long startIndex,
+      long expertedTerm, SimpleMessage... expectedMessages) {
+    Assert.assertEquals(expectedMessages.length, entries.length);
+    for(int i = 0; i < entries.length; i++) {
+      final LogEntryProto e = entries[i];
+      Assert.assertEquals(expertedTerm, e.getTerm());
+      Assert.assertEquals(startIndex + i, e.getIndex());
+      Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
+          e.getSmLogEntry().getData().toByteArray());
+    }
+  }
+
+  public static class SimpleMessage implements Message {
+    public static SimpleMessage[] create(int numMessages) {
+      return create(numMessages, "m");
+    }
+
+    public static SimpleMessage[] create(int numMessages, String prefix) {
+      final SimpleMessage[] messages = new SimpleMessage[numMessages];
+      for (int i = 0; i < messages.length; i++) {
+        messages[i] = new SimpleMessage(prefix + i);
+      }
+      return messages;
+    }
+
+    final String messageId;
+
+    public SimpleMessage(final String messageId) {
+      this.messageId = messageId;
+    }
+
+    @Override
+    public String toString() {
+      return messageId;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || !(obj instanceof SimpleMessage)) {
+        return false;
+      } else {
+        final SimpleMessage that = (SimpleMessage)obj;
+        return this.messageId.equals(that.messageId);
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return messageId.hashCode();
+    }
+
+    @Override
+    public ByteString getContent() {
+      return toByteString(messageId.getBytes(Charset.forName("UTF-8")));
+    }
+  }
+
+  public static class SimpleOperation {
+    private final String op;
+
+    public SimpleOperation(String op) {
+      Preconditions.checkArgument(op != null);
+      this.op = op;
+    }
+
+    @Override
+    public String toString() {
+      return op;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj == this ||
+          (obj instanceof SimpleOperation &&
+              ((SimpleOperation) obj).op.equals(op));
+    }
+
+    @Override
+    public int hashCode() {
+      return op.hashCode();
+    }
+
+    public SMLogEntryProto getLogEntryContent() {
+      try {
+        return SMLogEntryProto.newBuilder()
+            .setData(toByteString(op.getBytes("UTF-8"))).build();
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static File getTestDir(Class<?> caller) throws IOException {
+    File dir = new File(System.getProperty("test.build.data", 
"target/test/data")
+            + "/" + RandomStringUtils.randomAlphanumeric(10),
+            caller.getSimpleName());
+    if (dir.exists() && !dir.isDirectory()) {
+      throw new IOException(dir + " already exists and is not a directory");
+    } else if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create directory " + dir);
+    }
+    return dir;
+  }
+
+  public static void block(BooleanSupplier isBlocked) throws 
InterruptedException {
+    for(; isBlocked.getAsBoolean(); ) {
+      
Thread.sleep(RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+    }
+  }
+
+  public static void delay(IntSupplier getDelayMs) throws InterruptedException 
{
+    final int t = getDelayMs.getAsInt();
+    if (t > 0) {
+      Thread.sleep(t);
+    }
+  }
+
+  public static <T extends Throwable> void attempt(
+      int n, long sleepMs, CheckedRunnable<T> runnable)
+      throws T, InterruptedException {
+    for(int i = 1; i <= n; i++) {
+      LOG.info("Attempt #" + i + "/" + n +  ": sleep " + sleepMs + "ms");
+      if (sleepMs > 0) {
+        Thread.sleep(sleepMs);
+      }
+      try {
+        runnable.run();
+        return;
+      } catch (Throwable t) {
+        if (i == n) {
+          throw t;
+        }
+        LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and 
retry.");
+      }
+    }
+  }
+
+  public static String changeLeader(MiniRaftCluster cluster, String oldLeader)
+      throws InterruptedException {
+    cluster.setBlockRequestsFrom(oldLeader, true);
+    String newLeader = oldLeader;
+    for(int i = 0; i < 10 && newLeader.equals(oldLeader); i++) {
+      newLeader = RaftTestUtil.waitForLeader(cluster).getId();
+    }
+    cluster.setBlockRequestsFrom(oldLeader, false);
+    return newLeader;
+  }
+
+  public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers,
+      DelayLocalExecutionInjection injection, String leaderId, int delayMs,
+      long maxTimeout) throws InterruptedException {
+    // block reqeusts sent to leader if delayMs > 0
+    final boolean block = delayMs > 0;
+    LOG.debug("{} requests sent to leader {} and set {}ms delay for the 
others",
+        block? "Block": "Unblock", leaderId, delayMs);
+    if (block) {
+      BlockRequestHandlingInjection.getInstance().blockReplier(leaderId);
+    } else {
+      BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId);
+    }
+
+    // delay RaftServerRequest for other servers
+    servers.stream().filter(s -> !s.getId().equals(leaderId))
+        .forEach(s -> {
+          if (block) {
+            injection.setDelayMs(s.getId(), delayMs);
+          } else {
+            injection.removeDelay(s.getId());
+          }
+        });
+
+    Thread.sleep(3 * maxTimeout);
+  }
+
+  public static void setBlockRequestsFrom(String src, boolean block) {
+    if (block) {
+      BlockRequestHandlingInjection.getInstance().blockRequestor(src);
+    } else {
+      BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
new file mode 100644
index 0000000..a7f1b6d
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Inject code to block a server from handling incoming requests. */
+public class BlockRequestHandlingInjection implements 
CodeInjectionForTesting.Code {
+  private static final BlockRequestHandlingInjection INSTANCE =
+      new BlockRequestHandlingInjection();
+
+  static {
+    CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE);
+    CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE);
+  }
+
+  public static BlockRequestHandlingInjection getInstance() {
+    return INSTANCE;
+  }
+
+  private final Map<String, Boolean> requestors = new ConcurrentHashMap<>();
+  private final Map<String, Boolean> repliers = new ConcurrentHashMap<>();
+
+  private BlockRequestHandlingInjection() {}
+
+  public void blockRequestor(String requestor) {
+    requestors.put(requestor, true);
+  }
+
+  public void unblockRequestor(String requestor) {
+    requestors.remove(requestor);
+  }
+
+  public void blockReplier(String replier) {
+    repliers.put(replier, true);
+  }
+
+  public void unblockReplier(String replier) {
+    repliers.remove(replier);
+  }
+
+  public void unblockAll() {
+    requestors.clear();
+    repliers.clear();
+  }
+
+  @Override
+  public boolean execute(String localId, String remoteId, Object... args) {
+    if (shouldBlock(localId, remoteId)) {
+      try {
+        RaftTestUtil.block(() -> shouldBlock(localId, remoteId));
+        return true;
+      } catch (InterruptedException e) {
+        LOG.debug("Interrupted while blocking request handling from " + 
remoteId
+            + " to " + localId);
+      }
+    }
+    return false;
+  }
+
+  private boolean shouldBlock(String localId, String remoteId) {
+    return repliers.containsKey(localId) || requestors.containsKey(remoteId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
new file mode 100644
index 0000000..8de2474
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.util.CodeInjectionForTesting;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Inject code to delay particular servers. */
+public class DelayLocalExecutionInjection implements 
CodeInjectionForTesting.Code {
+  private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>();
+
+  public DelayLocalExecutionInjection(String method) {
+    CodeInjectionForTesting.put(method, this);
+  }
+
+  public void clear() {
+    delays.clear();
+  }
+
+  public void setDelayMs(String id, int delayMs) {
+    AtomicInteger d = delays.get(id);
+    if (d == null) {
+      delays.put(id, d = new AtomicInteger());
+    }
+    d.set(delayMs);
+  }
+
+  public void removeDelay(String id) {
+    delays.remove(id);
+  }
+
+  @Override
+  public boolean execute(String localId, String remoteId, Object... args) {
+    final AtomicInteger d = delays.get(localId);
+    if (d == null) {
+      return false;
+    }
+    LOG.info("{} delay {} ms, args={}", localId, d.get(),
+        Arrays.toString(args));
+    try {
+      RaftTestUtil.delay(d::get);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while delaying " + localId);
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
new file mode 100644
index 0000000..30b334f
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static java.util.Arrays.asList;
+import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static 
org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.MiniRaftCluster.PeerChanges;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.ReconfigurationTimeoutException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RaftReconfigurationBaseTest {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+  static final Logger LOG = 
LoggerFactory.getLogger(RaftReconfigurationBaseTest.class);
+
+  protected static final RaftProperties prop = new RaftProperties();
+
+  @BeforeClass
+  public static void setup() {
+    // set a small gap for tests
+    prop.setInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, 10);
+  }
+
+  public abstract MiniRaftCluster getCluster(int peerNum) throws IOException;
+
+  private static int getStagingGap() {
+    return 
prop.getInt(RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT);
+  }
+
+  /**
+   * add 2 new peers (3 peers -> 5 peers), no leader change
+   */
+  @Test
+  public void testAddPeers() throws Exception {
+    LOG.info("Start testAddPeers");
+    MiniRaftCluster cluster = getCluster(3);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      // add new peers
+      RaftPeer[] allPeers = cluster.addNewPeers(2, true).allPeersInNewConf;
+
+      // trigger setConfiguration
+      SetConfigurationRequest request = new SetConfigurationRequest("client",
+          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+      LOG.info("Start changing the configuration: {}", request);
+      cluster.getLeader().setConfiguration(request);
+
+      // wait for the new configuration to take effect
+      waitAndCheckNewConf(cluster, allPeers, 0, null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * remove 2 peers (5 peers -> 3 peers), no leader change
+   */
+  @Test
+  public void testRemovePeers() throws Exception {
+    LOG.info("Start testRemovePeers");
+    MiniRaftCluster cluster = getCluster(5);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      // remove peers, leader still included in the new conf
+      RaftPeer[] allPeers = cluster
+          .removePeers(2, false, Collections.emptyList()).allPeersInNewConf;
+
+      // trigger setConfiguration
+      SetConfigurationRequest request = new SetConfigurationRequest("client",
+          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+      LOG.info("Start changing the configuration: {}", request);
+      cluster.getLeader().setConfiguration(request);
+
+      // wait for the new configuration to take effect
+      waitAndCheckNewConf(cluster, allPeers, 2, null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * 5 peers -> 5 peers, remove 2 old, add 2 new, no leader change
+   */
+  @Test
+  public void testAddRemovePeers() throws Exception {
+    LOG.info("Start testAddRemovePeers");
+    testAddRemovePeers(false);
+  }
+
+  @Test
+  public void testLeaderStepDown() throws Exception {
+    LOG.info("Start testLeaderStepDown");
+    testAddRemovePeers(true);
+  }
+
+  private void testAddRemovePeers(boolean leaderStepdown) throws Exception {
+    MiniRaftCluster cluster = getCluster(5);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      PeerChanges change = cluster.addNewPeers(2, true);
+      RaftPeer[] allPeers = cluster.removePeers(2, leaderStepdown,
+          asList(change.newPeers)).allPeersInNewConf;
+
+      // trigger setConfiguration
+      SetConfigurationRequest request = new SetConfigurationRequest("client",
+          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+      LOG.info("Start changing the configuration: {}", request);
+      cluster.getLeader().setConfiguration(request);
+
+      // wait for the new configuration to take effect
+      waitAndCheckNewConf(cluster, allPeers, 2, null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testReconfTwice() throws Exception {
+    LOG.info("Start testReconfTwice");
+    final MiniRaftCluster cluster = getCluster(3);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final String leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient("client", leaderId);
+
+      // submit some msgs before reconf
+      for (int i = 0; i < getStagingGap() * 2; i++) {
+        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      final AtomicBoolean reconf1 = new AtomicBoolean(false);
+      final AtomicBoolean reconf2 = new AtomicBoolean(false);
+      final AtomicReference<RaftPeer[]> finalPeers = new 
AtomicReference<>(null);
+      final AtomicReference<RaftPeer[]> deadPeers = new 
AtomicReference<>(null);
+      CountDownLatch latch = new CountDownLatch(1);
+      Thread clientThread = new Thread(() -> {
+        try {
+          PeerChanges c1 = cluster.addNewPeers(2, true);
+          LOG.info("Start changing the configuration: {}",
+              asList(c1.allPeersInNewConf));
+
+          RaftClientReply reply = 
client.setConfiguration(c1.allPeersInNewConf);
+          reconf1.set(reply.isSuccess());
+
+          PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
+          finalPeers.set(c2.allPeersInNewConf);
+          deadPeers.set(c2.removedPeers);
+
+          LOG.info("Start changing the configuration again: {}",
+              asList(c2.allPeersInNewConf));
+          reply = client.setConfiguration(c2.allPeersInNewConf);
+          reconf2.set(reply.isSuccess());
+
+          latch.countDown();
+          client.close();
+        } catch (IOException ignored) {
+        }
+      });
+      clientThread.start();
+
+      latch.await();
+      Assert.assertTrue(reconf1.get());
+      Assert.assertTrue(reconf2.get());
+      waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
+
+      // check configuration manager's internal state
+      // each reconf will generate two configurations: (old, new) and (new)
+      cluster.getServers().stream().filter(RaftServerImpl::isAlive)
+          .forEach(server -> {
+        ConfigurationManager confManager =
+            (ConfigurationManager) Whitebox.getInternalState(server.getState(),
+                "configurationManager");
+        // each reconf will generate two configurations: (old, new) and (new)
+        Assert.assertEquals(5, confManager.numOfConf());
+      });
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReconfTimeout() throws Exception {
+    LOG.info("Start testReconfTimeout");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(3);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final String leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient("client", leaderId);
+
+      PeerChanges c1 = cluster.addNewPeers(2, false);
+
+      LOG.info("Start changing the configuration: {}",
+          asList(c1.allPeersInNewConf));
+      Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional());
+
+      final RaftClientRequestSender sender = 
((RaftClientImpl)client).getRequestSender();
+      final SetConfigurationRequest request = new SetConfigurationRequest(
+          "client", leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf);
+      try {
+        sender.sendRequest(request);
+        Assert.fail("did not get expected exception");
+      } catch (IOException e) {
+        Assert.assertTrue("Got exception " + e,
+            e instanceof ReconfigurationTimeoutException);
+      }
+
+      // the two new peers have not started yet, the bootstrapping must timeout
+      LOG.info(cluster.printServers());
+
+      // resend the same request, make sure the server has correctly reset its
+      // state so that we still get timeout instead of in-progress exception
+      try {
+        sender.sendRequest(request);
+        Assert.fail("did not get expected exception");
+      } catch (IOException e) {
+        Assert.assertTrue("Got exception " + e,
+            e instanceof ReconfigurationTimeoutException);
+      }
+
+      // start the two new peers
+      LOG.info("Start new peers");
+      for (RaftPeer np : c1.newPeers) {
+        cluster.startServer(np.getId());
+      }
+      
Assert.assertTrue(client.setConfiguration(c1.allPeersInNewConf).isSuccess());
+      client.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBootstrapReconf() throws Exception {
+    LOG.info("Start testBootstrapReconf");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(3);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final String leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient("client", leaderId);
+
+      // submit some msgs before reconf
+      for (int i = 0; i < getStagingGap() * 2; i++) {
+        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      PeerChanges c1 = cluster.addNewPeers(2, true);
+      LOG.info("Start changing the configuration: {}",
+          asList(c1.allPeersInNewConf));
+      final AtomicReference<Boolean> success = new AtomicReference<>();
+
+      Thread clientThread = new Thread(() -> {
+        try {
+          RaftClientReply reply = 
client.setConfiguration(c1.allPeersInNewConf);
+          success.set(reply.isSuccess());
+          client.close();
+        } catch (IOException ioe) {
+          LOG.error("FAILED", ioe);
+        }
+      });
+      clientThread.start();
+
+      Thread.sleep(5000);
+      LOG.info(cluster.printServers());
+      assertSuccess(success);
+
+      final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+      for (RaftPeer newPeer : c1.newPeers) {
+        Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
+            cluster.getServer(newPeer.getId()).getState().getLog()
+                .getEntries(0, Long.MAX_VALUE));
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * kill the leader before reconfiguration finishes. Make sure the client 
keeps
+   * retrying.
+   */
+  @Test
+  public void testKillLeaderDuringReconf() throws Exception {
+    LOG.info("Start testKillLeaderDuringReconf");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(3);
+    cluster.start();
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final String leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient("client", leaderId);
+
+      PeerChanges c1 = cluster.addNewPeers(2, false);
+      PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers));
+
+      LOG.info("Start changing the configuration: {}",
+          asList(c2.allPeersInNewConf));
+      final AtomicReference<Boolean> success = new AtomicReference<>();
+      final AtomicBoolean clientRunning = new AtomicBoolean(true);
+      Thread clientThread = new Thread(() -> {
+        try {
+          boolean r = false;
+          while (clientRunning.get() && !r) {
+            r = client.setConfiguration(c2.allPeersInNewConf).isSuccess();
+          }
+          success.set(r);
+          client.close();
+        } catch (IOException ignored) {
+        }
+      });
+      clientThread.start();
+
+      // the leader cannot generate the (old, new) conf, and it will keep
+      // bootstrapping the 2 new peers since they have not started yet
+      LOG.info(cluster.printServers());
+      Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional());
+
+      // only the first empty entry got committed
+      final long committedIndex = cluster.getLeader().getState().getLog()
+          .getLastCommittedIndex();
+      Assert.assertTrue("committedIndex is " + committedIndex,
+          committedIndex <= 1);
+
+      LOG.info("kill the current leader");
+      final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true);
+      LOG.info("start the two new peers: {}", Arrays.asList(c1.newPeers));
+      for (RaftPeer np : c1.newPeers) {
+        cluster.startServer(np.getId());
+      }
+
+      Thread.sleep(3000);
+      // the client should get the NotLeaderException from the first leader, 
and
+      // will retry the same setConfiguration request
+      waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2,
+          Collections.singletonList(oldLeaderId));
+      clientRunning.set(false);
+      //Assert.assertTrue(success.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static void assertSuccess(final AtomicReference<Boolean> success) {
+    final String s = "success=" + success;
+    Assert.assertNotNull(s, success.get());
+    Assert.assertTrue(s, success.get());
+  }
+
+  /**
+   * When a request's new configuration is the same with the current one, make
+   * sure we return success immediately and no log entry is recorded.
+   */
+  @Test
+  public void testNoChangeRequest() throws Exception {
+    LOG.info("Start testNoChangeRequest");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(3);
+    try {
+      cluster.start();
+      RaftTestUtil.waitForLeader(cluster);
+
+      final String leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient("client", leaderId);
+      client.send(new SimpleMessage("m"));
+
+      final long committedIndex = cluster.getLeader().getState().getLog()
+          .getLastCommittedIndex();
+      final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
+
+      // no real configuration change in the request
+      RaftClientReply reply = client.setConfiguration(cluster.getPeers()
+          .toArray(new RaftPeer[0]));
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(committedIndex, cluster.getLeader().getState()
+          .getLog().getLastCommittedIndex());
+      Assert.assertSame(confBefore, cluster.getLeader().getRaftConf());
+      client.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Make sure a setConfiguration request is rejected if a configuration change
+   * is still in progress (i.e., has not been committed yet).
+   */
+  @Test
+  public void testOverlappedSetConfRequests() throws Exception {
+    LOG.info("Start testOverlappedSetConfRequests");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(3);
+    try {
+      cluster.start();
+      RaftTestUtil.waitForLeader(cluster);
+
+      final String leaderId = cluster.getLeader().getId();
+
+      RaftPeer[] newPeers = cluster.addNewPeers(2, true).allPeersInNewConf;
+
+      // delay every peer's logSync so that the setConf request is delayed
+      cluster.getPeers()
+          .forEach(peer -> logSyncDelay.setDelayMs(peer.getId(), 1000));
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      final RaftPeer[] peersInRequest2 = cluster.getPeers().toArray(new 
RaftPeer[0]);
+      AtomicBoolean caughtException = new AtomicBoolean(false);
+      new Thread(() -> {
+        try(final RaftClient client2 = cluster.createClient("client2", 
leaderId)) {
+          latch.await();
+          LOG.info("client2 starts to change conf");
+          final RaftClientRequestSender sender2 = 
((RaftClientImpl)client2).getRequestSender();
+          sender2.sendRequest(new SetConfigurationRequest(
+              "client2", leaderId, DEFAULT_SEQNUM, peersInRequest2));
+        } catch (ReconfigurationInProgressException e) {
+          caughtException.set(true);
+        } catch (Exception e) {
+          LOG.warn("Got unexpected exception when client2 changes conf", e);
+        }
+      }).start();
+
+      AtomicBoolean confChanged = new AtomicBoolean(false);
+      new Thread(() -> {
+        try(final RaftClient client1 = cluster.createClient("client1", 
leaderId)) {
+          LOG.info("client1 starts to change conf");
+          confChanged.set(client1.setConfiguration(newPeers).isSuccess());
+        } catch (IOException e) {
+          LOG.warn("Got unexpected exception when client1 changes conf", e);
+        }
+      }).start();
+      Thread.sleep(100);
+      latch.countDown();
+
+      for (int i = 0; i < 10 && !confChanged.get(); i++) {
+        Thread.sleep(1000);
+      }
+      Assert.assertTrue(confChanged.get());
+      Assert.assertTrue(caughtException.get());
+    } finally {
+      logSyncDelay.clear();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test a scenario where the follower truncates its log entries which causes
+   * configuration change.
+   */
+  @Test
+  public void testRevertConfigurationChange() throws Exception {
+    LOG.info("Start testRevertConfigurationChange");
+    // originally 3 peers
+    final MiniRaftCluster cluster = getCluster(5);
+    try {
+      cluster.start();
+      RaftTestUtil.waitForLeader(cluster);
+
+      final String leaderId = cluster.getLeader().getId();
+
+      final RaftLog log = cluster.getServer(leaderId).getState().getLog();
+      Thread.sleep(1000);
+      Assert.assertEquals(0, log.getLatestFlushedIndex());
+
+      // we block the incoming msg for the leader and block its requests to
+      // followers, so that we force the leader change and the old leader will
+      // not know
+      LOG.info("start blocking the leader");
+      BlockRequestHandlingInjection.getInstance().blockReplier(leaderId);
+      cluster.setBlockRequestsFrom(leaderId, true);
+
+      PeerChanges change = cluster.removePeers(1, false, new ArrayList<>());
+
+      AtomicBoolean gotNotLeader = new AtomicBoolean(false);
+      new Thread(() -> {
+        try(final RaftClient client = cluster.createClient("client1", 
leaderId)) {
+          LOG.info("client starts to change conf");
+          final RaftClientRequestSender sender = 
((RaftClientImpl)client).getRequestSender();
+          RaftClientReply reply = sender.sendRequest(new 
SetConfigurationRequest(
+              "client", leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf));
+          if (reply.isNotLeader()) {
+            gotNotLeader.set(true);
+          }
+        } catch (IOException e) {
+          LOG.warn("Got unexpected exception when client1 changes conf", e);
+        }
+      }).start();
+
+      // wait till the old leader persist the new conf
+      for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) {
+        Thread.sleep(500);
+      }
+      Assert.assertEquals(1, log.getLatestFlushedIndex());
+      Assert.assertEquals(CONFIGURATIONENTRY,
+          log.getLastEntry().getLogEntryBodyCase());
+
+      // unblock the old leader
+      BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId);
+      cluster.setBlockRequestsFrom(leaderId, false);
+
+      // the client should get NotLeaderException
+      for (int i = 0; i < 10 && !gotNotLeader.get(); i++) {
+        Thread.sleep(500);
+      }
+      Assert.assertTrue(gotNotLeader.get());
+
+      // the old leader should have truncated the setConf from the log
+      boolean newState = false;
+      for (int i = 0; i < 10 && !newState; i++) {
+        Thread.sleep(500);
+        newState = log.getLastCommittedIndex() == 1 &&
+            log.getLastEntry().getLogEntryBodyCase() != CONFIGURATIONENTRY;
+      }
+      Assert.assertTrue(newState);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
new file mode 100644
index 0000000..dc10bd3
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.statemachine.StateMachine;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+public class RaftServerTestUtil {
+  static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
+
+  public static void waitAndCheckNewConf(MiniRaftCluster cluster,
+      RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers)
+      throws Exception {
+    final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2);
+    RaftTestUtil.attempt(3, sleepMs,
+        () -> waitAndCheckNewConf(cluster, peers, deadPeers));
+  }
+  private static void waitAndCheckNewConf(MiniRaftCluster cluster,
+      RaftPeer[] peers, Collection<String> deadPeers)
+      throws Exception {
+    LOG.info(cluster.printServers());
+    Assert.assertNotNull(cluster.getLeader());
+
+    int numIncluded = 0;
+    int deadIncluded = 0;
+    final RaftConfiguration current = RaftConfiguration.newBuilder()
+        .setConf(peers).setLogEntryIndex(0).build();
+    for (RaftServerImpl server : cluster.getServers()) {
+      if (deadPeers != null && deadPeers.contains(server.getId())) {
+        if (current.containsInConf(server.getId())) {
+          deadIncluded++;
+        }
+        continue;
+      }
+      if (current.containsInConf(server.getId())) {
+        numIncluded++;
+        Assert.assertTrue(server.getRaftConf().isStable());
+        Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
+      } else {
+        Assert.assertFalse(server.getId() + " is still running: " + server,
+            server.isAlive());
+      }
+    }
+    Assert.assertEquals(peers.length, numIncluded + deadIncluded);
+  }
+
+  public static StateMachine getStateMachine(RaftServerImpl s) {
+    return s.getStateMachine();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
new file mode 100644
index 0000000..fa177af
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
+  static final Logger LOG = 
LoggerFactory.getLogger(MiniRaftClusterWithSimulatedRpc.class);
+
+  public static final Factory<MiniRaftClusterWithSimulatedRpc> FACTORY
+      = new Factory<MiniRaftClusterWithSimulatedRpc>() {
+    @Override
+    public MiniRaftClusterWithSimulatedRpc newCluster(
+        String[] ids, RaftProperties prop, boolean formatted) {
+      prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
+      return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted);
+    }
+  };
+
+  private SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply;
+  private SimulatedClientRequestReply client2serverRequestReply;
+
+  public MiniRaftClusterWithSimulatedRpc(int numServers,
+      RaftProperties properties) {
+    this(generateIds(numServers, 0), properties, true);
+  }
+
+  public MiniRaftClusterWithSimulatedRpc(String[] ids,
+      RaftProperties properties, boolean formatted) {
+    super(ids, properties, formatted);
+    initRpc();
+  }
+
+  private void initRpc() {
+    final Collection<RaftPeer> peers = getConf().getPeers();
+    final int simulateLatencyMs = properties.getInt(
+        SimulatedRequestReply.SIMULATE_LATENCY_KEY,
+        SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT);
+    LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = "
+        + simulateLatencyMs);
+    serverRequestReply = new SimulatedRequestReply<>(peers, simulateLatencyMs);
+    client2serverRequestReply = new SimulatedClientRequestReply(peers,
+        simulateLatencyMs);
+
+    setRpcServers(getServers());
+  }
+
+  private void setRpcServers(Collection<RaftServerImpl> newServers) {
+    newServers.forEach(s -> s.setServerRpc(
+        new SimulatedServerRpc(s, serverRequestReply, 
client2serverRequestReply)));
+  }
+
+  @Override
+  protected void setPeerRpc() {
+    initRpc();
+  }
+
+  private void addPeersToRpc(Collection<RaftPeer> peers) {
+    serverRequestReply.addPeers(peers);
+    client2serverRequestReply.addPeers(peers);
+  }
+
+  @Override
+  public void restartServer(String id, boolean format) throws IOException {
+    super.restartServer(id, format);
+    RaftServerImpl s = getServer(id);
+    addPeersToRpc(Collections.singletonList(conf.getPeer(id)));
+    s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply,
+        client2serverRequestReply));
+    s.start();
+  }
+
+  @Override
+  public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
+                                          Collection<RaftServerImpl> 
newServers, boolean startService) {
+    addPeersToRpc(newPeers);
+    setRpcServers(newServers);
+    if (startService) {
+      newServers.forEach(RaftServerImpl::start);
+    }
+    return newPeers;
+  }
+
+  @Override
+  public RaftClientRequestSender getRaftClientRequestSender() {
+    return client2serverRequestReply;
+  }
+
+  @Override
+  public void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException {
+    // block leader sendRequest if delayMs > 0
+    final boolean block = delayMs > 0;
+    LOG.debug("{} leader queue {} and set {}ms delay for the other queues",
+        block? "Block": "Unblock", leaderId, delayMs);
+    serverRequestReply.getQueue(leaderId).blockSendRequestTo.set(block);
+
+    // set delay takeRequest for the other queues
+    getServers().stream().filter(s -> !s.getId().equals(leaderId))
+        .map(s -> serverRequestReply.getQueue(s.getId()))
+        .forEach(q -> q.delayTakeRequestTo.set(delayMs));
+
+    final long sleepMs = 3 * getMaxTimeout() / 2;
+    Thread.sleep(sleepMs);
+  }
+
+  @Override
+  public void setBlockRequestsFrom(String src, boolean block) {
+    serverRequestReply.getQueue(src).blockTakeRequestFrom.set(block);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
new file mode 100644
index 0000000..a157524
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.protocol.RaftRpcMessage;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+
+import com.google.common.base.Preconditions;
+
+public class RaftServerReply extends RaftRpcMessage {
+  private final AppendEntriesReplyProto appendEntries;
+  private final RequestVoteReplyProto requestVote;
+  private final InstallSnapshotReplyProto installSnapshot;
+
+  RaftServerReply(AppendEntriesReplyProto a) {
+    appendEntries = Preconditions.checkNotNull(a);
+    requestVote = null;
+    installSnapshot = null;
+  }
+
+  RaftServerReply(RequestVoteReplyProto r) {
+    appendEntries = null;
+    requestVote = Preconditions.checkNotNull(r);
+    installSnapshot = null;
+  }
+
+  RaftServerReply(InstallSnapshotReplyProto i) {
+    appendEntries = null;
+    requestVote = null;
+    installSnapshot = Preconditions.checkNotNull(i);
+  }
+
+  boolean isAppendEntries() {
+    return appendEntries != null;
+  }
+
+  boolean isRequestVote() {
+    return requestVote != null;
+  }
+
+  boolean isInstallSnapshot() {
+    return installSnapshot != null;
+  }
+
+  AppendEntriesReplyProto getAppendEntries() {
+    return appendEntries;
+  }
+
+  RequestVoteReplyProto getRequestVote() {
+    return requestVote;
+  }
+
+  InstallSnapshotReplyProto getInstallSnapshot() {
+    return installSnapshot;
+  }
+
+  @Override
+  public boolean isRequest() {
+    return false;
+  }
+
+  @Override
+  public String getRequestorId() {
+    if (isAppendEntries()) {
+      return appendEntries.getServerReply().getRequestorId();
+    } else if (isRequestVote()) {
+      return requestVote.getServerReply().getRequestorId();
+    } else {
+      return installSnapshot.getServerReply().getRequestorId();
+    }
+  }
+
+  @Override
+  public String getReplierId() {
+    if (isAppendEntries()) {
+      return appendEntries.getServerReply().getReplyId();
+    } else if (isRequestVote()) {
+      return requestVote.getServerReply().getReplyId();
+    } else {
+      return installSnapshot.getServerReply().getReplyId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
new file mode 100644
index 0000000..fd73dff
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.protocol.RaftRpcMessage;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+
+class RaftServerRequest extends RaftRpcMessage {
+  private final AppendEntriesRequestProto appendEntries;
+  private final RequestVoteRequestProto requestVote;
+  private final InstallSnapshotRequestProto installSnapshot;
+
+  RaftServerRequest(AppendEntriesRequestProto a) {
+    appendEntries = a;
+    requestVote = null;
+    installSnapshot = null;
+  }
+
+  RaftServerRequest(RequestVoteRequestProto r) {
+    appendEntries = null;
+    requestVote = r;
+    installSnapshot = null;
+  }
+
+  RaftServerRequest(InstallSnapshotRequestProto i) {
+    appendEntries = null;
+    requestVote = null;
+    installSnapshot = i;
+  }
+
+  boolean isAppendEntries() {
+    return appendEntries != null;
+  }
+
+  boolean isRequestVote() {
+    return requestVote != null;
+  }
+
+  boolean isInstallSnapshot() {
+    return installSnapshot != null;
+  }
+
+  AppendEntriesRequestProto getAppendEntries() {
+    return appendEntries;
+  }
+
+  RequestVoteRequestProto getRequestVote() {
+    return requestVote;
+  }
+
+  InstallSnapshotRequestProto getInstallSnapshot() {
+    return installSnapshot;
+  }
+
+  @Override
+  public boolean isRequest() {
+    return true;
+  }
+
+  @Override
+  public String getRequestorId() {
+    if (isAppendEntries()) {
+      return appendEntries.getServerRequest().getRequestorId();
+    } else if (isRequestVote()) {
+      return requestVote.getServerRequest().getRequestorId();
+    } else {
+      return installSnapshot.getServerRequest().getRequestorId();
+    }
+  }
+
+  @Override
+  public String getReplierId() {
+    if (isAppendEntries()) {
+      return appendEntries.getServerRequest().getReplyId();
+    } else if (isRequestVote()) {
+      return requestVote.getServerRequest().getReplyId();
+    } else {
+      return installSnapshot.getServerRequest().getReplyId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
new file mode 100644
index 0000000..bd60a3b
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.protocol.RaftRpcMessage;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RequestHandler<REQUEST extends RaftRpcMessage,
+    REPLY extends RaftRpcMessage> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RequestHandler.class);
+
+  interface HandlerInterface<REQUEST extends RaftRpcMessage,
+      REPLY extends RaftRpcMessage> {
+
+    boolean isAlive();
+
+    REPLY handleRequest(REQUEST r) throws IOException;
+  }
+
+  private final String serverId;
+  private final String name;
+  private final SimulatedRequestReply<REQUEST, REPLY> rpc;
+  private final HandlerInterface<REQUEST, REPLY> handlerImpl;
+  private final List<HandlerDaemon> daemons;
+
+  RequestHandler(String serverId, String name,
+                 SimulatedRequestReply<REQUEST, REPLY> rpc,
+                 HandlerInterface<REQUEST, REPLY> handlerImpl,
+                 int numHandlers) {
+    this.serverId = serverId;
+    this.name = name;
+    this.rpc = rpc;
+    this.handlerImpl = handlerImpl;
+
+    this.daemons = new ArrayList<>(numHandlers);
+    for(int i = 0; i < numHandlers; i++) {
+      daemons.add(new HandlerDaemon(i));
+    }
+  }
+
+  void startDaemon() {
+    daemons.forEach(Thread::start);
+  }
+
+  void shutdown() {
+    rpc.shutdown(serverId);
+  }
+
+  void interruptAndJoinDaemon() throws InterruptedException {
+    daemons.forEach(Thread::interrupt);
+    for (Daemon d : daemons) {
+      d.join();
+    }
+  }
+
+  SimulatedRequestReply<REQUEST, REPLY> getRpc() {
+    return rpc;
+  }
+
+  void handleRequest(REQUEST request) throws IOException {
+    final REPLY reply;
+    try {
+      reply = handlerImpl.handleRequest(request);
+    } catch (IOException ioe) {
+      LOG.debug("IOException for " + request, ioe);
+      rpc.sendReply(request, null, ioe);
+      return;
+    }
+    if (reply != null) {
+      rpc.sendReply(request, reply, null);
+    }
+  }
+
+  /**
+   * A thread keep polling requests from the request queue. Used for 
simulation.
+   */
+  class HandlerDaemon extends Daemon {
+    private final int id;
+
+    HandlerDaemon(int id) {
+      this.id = id;
+    }
+
+    @Override
+    public String toString() {
+      return serverId + "." + name + id;
+    }
+
+    @Override
+    public void run() {
+      while (handlerImpl.isAlive()) {
+        try {
+          handleRequest(rpc.takeRequest(serverId));
+        } catch (InterruptedIOException e) {
+          LOG.info(this + " is interrupted by " + e);
+          LOG.trace("TRACE", e);
+          break;
+        } catch (IOException e) {
+          LOG.error(this + " has " + e);
+          LOG.trace("TRACE", e);
+        } catch(Throwable t) {
+          if (!handlerImpl.isAlive()) {
+            LOG.info(this + " is stopped.");
+            break;
+          }
+          ExitUtils.terminate(1, this + " is terminating.", t, LOG);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
new file mode 100644
index 0000000..9302051
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+public class SimulatedClientRequestReply
+    extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
+    implements RaftClientRequestSender {
+  SimulatedClientRequestReply(Collection<RaftPeer> allPeers,
+                              int simulateLatencyMs) {
+    super(allPeers, simulateLatencyMs);
+  }
+
+  @Override
+  public void addServers(Iterable<RaftPeer> servers) {
+    // do nothing
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
new file mode 100644
index 0000000..559c1e6
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftRpcMessage;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.Timestamp;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SimulatedRequestReply<REQUEST extends RaftRpcMessage,
+    REPLY extends RaftRpcMessage> {
+  public static final String SIMULATE_LATENCY_KEY
+      = SimulatedRequestReply.class.getName() + ".simulateLatencyMs";
+  public static final int SIMULATE_LATENCY_DEFAULT
+      = RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
+  public static final long TIMEOUT = 3000L;
+
+  private static class ReplyOrException<REPLY> {
+    private final REPLY reply;
+    private final IOException ioe;
+
+    ReplyOrException(REPLY reply, IOException ioe) {
+      Preconditions.checkArgument(reply == null ^ ioe == null);
+      this.reply = reply;
+      this.ioe = ioe;
+    }
+  }
+
+  static class EventQueue<REQUEST, REPLY> {
+    private final BlockingQueue<REQUEST> requestQueue
+        = new LinkedBlockingQueue<>();
+    private final Map<REQUEST, ReplyOrException<REPLY>> replyMap
+        = new ConcurrentHashMap<>();
+
+    /** Block takeRequest for the requests sent from this server. */
+    final AtomicBoolean blockTakeRequestFrom = new AtomicBoolean();
+    /** Block sendRequest for the requests sent to this server. */
+    final AtomicBoolean blockSendRequestTo = new AtomicBoolean();
+    /** Delay takeRequest for the requests sent to this server. */
+    final AtomicInteger delayTakeRequestTo = new AtomicInteger();
+    /** Delay takeRequest for the requests sent from this server. */
+    final AtomicInteger delayTakeRequestFrom = new AtomicInteger();
+
+    REPLY request(REQUEST request) throws InterruptedException, IOException {
+      requestQueue.put(request);
+      synchronized (this) {
+        final Timestamp startTime = new Timestamp();
+        while (startTime.elapsedTimeMs() < TIMEOUT &&
+            !replyMap.containsKey(request)) {
+          this.wait(TIMEOUT); // no need to be precise here
+        }
+      }
+
+      if (!replyMap.containsKey(request)) {
+        throw new IOException("Timeout while waiting for reply of request "
+            + request);
+      }
+      final ReplyOrException<REPLY> re = replyMap.remove(request);
+      if (re.ioe != null) {
+        throw re.ioe;
+      }
+      return re.reply;
+    }
+
+    REQUEST takeRequest() throws InterruptedException {
+      return requestQueue.take();
+    }
+
+    void reply(REQUEST request, REPLY reply, IOException ioe)
+        throws IOException {
+      replyMap.put(request, new ReplyOrException<>(reply, ioe));
+      synchronized (this) {
+        this.notifyAll();
+      }
+    }
+  }
+
+  private final Map<String, EventQueue<REQUEST, REPLY>> queues;
+  private final int simulateLatencyMs;
+
+  SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) {
+    queues = new ConcurrentHashMap<>();
+    for (RaftPeer peer : allPeers) {
+      queues.put(peer.getId(), new EventQueue<>());
+    }
+
+    this.simulateLatencyMs = simulateLatencyMs;
+  }
+
+  EventQueue<REQUEST, REPLY> getQueue(String qid) {
+    return queues.get(qid);
+  }
+
+  public REPLY sendRequest(REQUEST request) throws IOException {
+    final String qid = request.getReplierId();
+    final EventQueue<REQUEST, REPLY> q = queues.get(qid);
+    if (q == null) {
+      throw new IOException("The peer " + qid + " is not alive.");
+    }
+    try {
+      RaftTestUtil.block(q.blockSendRequestTo::get);
+      return q.request(request);
+    } catch (InterruptedException e) {
+      throw RaftUtils.toInterruptedIOException("", e);
+    }
+  }
+
+  public REQUEST takeRequest(String qid) throws IOException {
+    final EventQueue<REQUEST, REPLY> q = queues.get(qid);
+    if (q == null) {
+      throw new IOException("The RPC of " + qid + " has already shutdown.");
+    }
+
+    final REQUEST request;
+    try {
+      // delay request for testing
+      RaftTestUtil.delay(q.delayTakeRequestTo::get);
+
+      request = q.takeRequest();
+      Preconditions.checkState(qid.equals(request.getReplierId()));
+
+      // block request for testing
+      final EventQueue<REQUEST, REPLY> reqQ = 
queues.get(request.getRequestorId());
+      if (reqQ != null) {
+        RaftTestUtil.delay(reqQ.delayTakeRequestFrom::get);
+        RaftTestUtil.block(reqQ.blockTakeRequestFrom::get);
+      }
+    } catch (InterruptedException e) {
+      throw RaftUtils.toInterruptedIOException("", e);
+    }
+    return request;
+  }
+
+  public void sendReply(REQUEST request, REPLY reply, IOException ioe)
+      throws IOException {
+    if (reply != null) {
+      Preconditions.checkArgument(
+          request.getRequestorId().equals(reply.getRequestorId()));
+      Preconditions.checkArgument(
+          request.getReplierId().equals(reply.getReplierId()));
+    }
+    simulateLatency();
+    final String qid = request.getReplierId();
+    EventQueue<REQUEST, REPLY> q = queues.get(qid);
+    if (q != null) {
+      q.reply(request, reply, ioe);
+    }
+  }
+
+  public void shutdown(String id) {
+    queues.remove(id);
+  }
+
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    for (RaftPeer peer : newPeers) {
+      queues.put(peer.getId(), new EventQueue<>());
+    }
+  }
+
+  private void simulateLatency() throws IOException {
+    if (simulateLatencyMs > 0) {
+      int waitExpetation = simulateLatencyMs / 10;
+      int waitHalfRange = waitExpetation / 3;
+      int randomSleepMs = ThreadLocalRandom.current().nextInt(2 * 
waitHalfRange)
+          + waitExpetation - waitHalfRange;
+      try {
+        Thread.sleep(randomSleepMs);
+      } catch (InterruptedException ie) {
+        throw RaftUtils.toInterruptedIOException("", ie);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
new file mode 100644
index 0000000..d40cf44
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+class SimulatedServerRpc implements RaftServerRpc {
+  static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
+
+  private final RaftServerImpl server;
+  private final RequestHandler<RaftServerRequest, RaftServerReply> 
serverHandler;
+  private final RequestHandler<RaftClientRequest, RaftClientReply> 
clientHandler;
+  private final ExecutorService executor = Executors.newFixedThreadPool(3,
+      new ThreadFactoryBuilder().setDaemon(true).build());
+
+  SimulatedServerRpc(RaftServerImpl server,
+      SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
+      SimulatedRequestReply<RaftClientRequest, RaftClientReply> 
clientRequestReply) {
+    this.server = server;
+    this.serverHandler = new RequestHandler<>(server.getId(),
+        "serverHandler", serverRequestReply, serverHandlerImpl, 3);
+    this.clientHandler = new RequestHandler<>(server.getId(),
+        "clientHandler", clientRequestReply, clientHandlerImpl, 3);
+  }
+
+  @Override
+  public void start() {
+    serverHandler.startDaemon();
+    clientHandler.startDaemon();
+  }
+
+  private void interruptAndJoin() throws InterruptedException {
+    clientHandler.interruptAndJoinDaemon();
+    serverHandler.interruptAndJoinDaemon();
+  }
+
+  @Override
+  public void close() {
+    try {
+      interruptAndJoin();
+      executor.shutdown();
+      executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+    }
+    clientHandler.shutdown();
+    serverHandler.shutdown();
+  }
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return null;
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request)
+      throws IOException {
+    RaftServerReply reply = serverHandler.getRpc()
+        .sendRequest(new RaftServerRequest(request));
+    return reply.getAppendEntries();
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request)
+      throws IOException {
+    RaftServerReply reply = serverHandler.getRpc()
+        .sendRequest(new RaftServerRequest(request));
+    return reply.getInstallSnapshot();
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
+      throws IOException {
+    RaftServerReply reply = serverHandler.getRpc()
+        .sendRequest(new RaftServerRequest(request));
+    return reply.getRequestVote();
+  }
+
+  @Override
+  public void addPeers(Iterable<RaftPeer> peers) {
+    // do nothing
+  }
+
+  final RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply> 
serverHandlerImpl
+      = new RequestHandler.HandlerInterface<RaftServerRequest, 
RaftServerReply>() {
+    @Override
+    public boolean isAlive() {
+      return server.isAlive();
+    }
+
+    @Override
+    public RaftServerReply handleRequest(RaftServerRequest r)
+        throws IOException {
+      if (r.isAppendEntries()) {
+        return new RaftServerReply(server.appendEntries(r.getAppendEntries()));
+      } else if (r.isRequestVote()) {
+        return new RaftServerReply(server.requestVote(r.getRequestVote()));
+      } else if (r.isInstallSnapshot()) {
+        return new 
RaftServerReply(server.installSnapshot(r.getInstallSnapshot()));
+      } else {
+        throw new IllegalStateException("unexpected state");
+      }
+    }
+  };
+
+  final RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply> 
clientHandlerImpl
+      = new RequestHandler.HandlerInterface<RaftClientRequest, 
RaftClientReply>() {
+    @Override
+    public boolean isAlive() {
+      return server.isAlive();
+    }
+
+    @Override
+    public RaftClientReply handleRequest(RaftClientRequest request)
+        throws IOException {
+      final CompletableFuture<RaftClientReply> future;
+      if (request instanceof SetConfigurationRequest) {
+        future = server.setConfigurationAsync((SetConfigurationRequest) 
request);
+      } else {
+        future = server.submitClientRequestAsync(request);
+      }
+
+      future.whenCompleteAsync((reply, exception) -> {
+        try {
+          IOException e = null;
+          if (exception != null) {
+            e = exception instanceof IOException ?
+                (IOException) exception : new IOException(exception);
+          }
+          clientHandler.getRpc().sendReply(request, reply, e);
+        } catch (IOException e) {
+          LOG.warn("Failed to send reply {} for request {} due to exception 
{}",
+              reply, request, e);
+        }
+      }, executor);
+      return null;
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java
new file mode 100644
index 0000000..412fb65
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestNotLeaderExceptionWithSimulation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+
+import java.io.IOException;
+
+public class TestNotLeaderExceptionWithSimulation extends 
RaftNotLeaderExceptionBaseTest {
+  @Override
+  public MiniRaftCluster initCluster() throws IOException {
+    String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
+    return new MiniRaftClusterWithSimulatedRpc(s, new RaftProperties(), true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java
new file mode 100644
index 0000000..b8bd679
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftReconfigurationWithSimulatedRpc.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import java.io.IOException;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+
+public class TestRaftReconfigurationWithSimulatedRpc
+    extends RaftReconfigurationBaseTest {
+  @Override
+  public MiniRaftCluster getCluster(int peerNum) throws IOException {
+    return new MiniRaftClusterWithSimulatedRpc(peerNum, prop);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java
new file mode 100644
index 0000000..f2d5cfb
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftSnapshotWithSimulatedRpc.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+
+import java.io.IOException;
+
+public class TestRaftSnapshotWithSimulatedRpc extends RaftSnapshotBaseTest {
+  @Override
+  public MiniRaftCluster initCluster(int numServer, RaftProperties prop)
+      throws IOException {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(numServer, prop, 
true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
new file mode 100644
index 0000000..29ef6ed
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestRaftWithSimulatedRpc extends RaftBasicTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithSimulatedRpc cluster;
+
+  public TestRaftWithSimulatedRpc() throws IOException {
+    final RaftProperties properties = getProperties();
+    if (ThreadLocalRandom.current().nextBoolean()) {
+      // turn off simulate latency half of the times.
+      properties.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
+    }
+    cluster = new MiniRaftClusterWithSimulatedRpc(NUM_SERVERS, properties);
+  }
+
+  @Override
+  public MiniRaftClusterWithSimulatedRpc getCluster() {
+    return cluster;
+  }
+}

Reply via email to