Repository: incubator-ratis
Updated Branches:
  refs/heads/master c7c4d1140 -> d7fb566c4


RATIS-28. Create RaftServerRpc using ServerFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/d7fb566c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/d7fb566c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/d7fb566c

Branch: refs/heads/master
Commit: d7fb566c4a0571b92354f9bfb57cdb0a3cd315e4
Parents: c7c4d11
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Fri Feb 24 16:17:19 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Fri Feb 24 16:17:19 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/RaftConfigKeys.java   |  42 ++++++++
 .../java/org/apache/ratis/conf/ConfUtils.java   |  21 ++--
 .../java/org/apache/ratis/util/NetUtils.java    |  16 ++-
 .../ratis/examples/RaftExamplesTestUtil.java    |   1 -
 .../org/apache/ratis/grpc/RaftGRpcService.java  |  33 ++----
 .../ratis/grpc/server/GrpcServerFactory.java    |  15 ++-
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     |  64 +++---------
 .../apache/ratis/hadooprpc/HadoopFactory.java   |  40 ++++++++
 .../server/HadoopRpcServerConfigKeys.java       |   2 +-
 .../hadooprpc/server/HadoopRpcService.java      |   7 +-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |  61 +++--------
 .../org/apache/ratis/netty/NettyConfigKeys.java |  42 ++++++++
 .../org/apache/ratis/netty/NettyFactory.java    |  30 ++++++
 .../ratis/netty/server/NettyRpcService.java     |  10 +-
 .../ratis/netty/MiniRaftClusterWithNetty.java   |  57 ++---------
 .../org/apache/ratis/server/RaftServer.java     |  11 +-
 .../ratis/server/RaftServerConfigKeys.java      |  11 +-
 .../org/apache/ratis/server/RaftServerRpc.java  |  23 +----
 .../ratis/server/impl/ConfigurationManager.java |  11 +-
 .../ratis/server/impl/RaftServerImpl.java       |  97 ++++++++----------
 .../apache/ratis/server/impl/ServerFactory.java |   7 +-
 .../apache/ratis/server/impl/ServerState.java   |   8 --
 .../java/org/apache/ratis/MiniRaftCluster.java  | 101 +++++++++----------
 .../MiniRaftClusterWithSimulatedRpc.java        |  44 ++++----
 .../simulation/SimulatedClientRequestReply.java |  10 +-
 .../simulation/SimulatedRequestReply.java       |  14 +--
 .../server/simulation/SimulationFactory.java    |  41 ++++++++
 27 files changed, 424 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java 
b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
new file mode 100644
index 0000000..f3a5942
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.conf.ConfUtils;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public interface RaftConfigKeys {
+  String PREFIX = "raft";
+
+  abstract class Rpc {
+    public static final String PREFIX = RaftConfigKeys.PREFIX + ".rpc";
+
+    public static final String TYPE_KEY = PREFIX + ".type";
+    public static final RpcType TYPE_DEFAULT = RpcType.GRPC;
+
+    public static RpcType type(BiFunction<String, RpcType, RpcType> 
getRpcType) {
+      return ConfUtils.get(getRpcType, TYPE_KEY, TYPE_DEFAULT);
+    }
+
+    public static void setType(BiConsumer<String, RpcType> setRpcType, RpcType 
type) {
+      ConfUtils.set(setRpcType, TYPE_KEY, type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index e43ac60..f3870e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -27,10 +27,9 @@ public abstract class ConfUtils {
     return value;
   }
 
-  public static String getString(
-      BiFunction<String, String, String> getString,
-      String key, String defaultValue) {
-    final String value = getString.apply(key, defaultValue);
+  public static <T> T get(BiFunction<String, T, T> getString,
+      String key, T defaultValue) {
+    final T value = getString.apply(key, defaultValue);
     LOG.info(key + " = " + value);
     return value;
   }
@@ -38,13 +37,17 @@ public abstract class ConfUtils {
   public static InetSocketAddress getInetSocketAddress(
       BiFunction<String, String, String> getString,
       String key, String defaultValue) {
-    return NetUtils.createSocketAddr(getString(getString, key, defaultValue));
+    return NetUtils.createSocketAddr(get(getString, key, defaultValue));
+  }
+
+  public static void setInt(BiConsumer<String, Integer> setInt,
+      String key, int value) {
+    setInt.accept(key, value);
+    LOG.info("set " + key + " = " + value);
   }
 
-  public static void setString(
-      BiConsumer<String, String> setString,
-      String key, String value) {
-    setString.accept(key, value);
+  public static <T> void set(BiConsumer<String, T> set, String key, T value) {
+    set.accept(key, value);
     LOG.info("set " + key + " = " + value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
index b6634b6..6f0bc2b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/NetUtils.java
@@ -21,10 +21,8 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
+import java.io.IOException;
+import java.net.*;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -148,4 +146,14 @@ public abstract class NetUtils {
     }
     return addr;
   }
+
+  public static InetSocketAddress createLocalServerAddress() {
+    try(final ServerSocket s = new ServerSocket()) {
+      s.setReuseAddress(true);
+      s.bind(null);
+      return (InetSocketAddress) s.getLocalSocketAddress();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
index 873c0da..f54b766 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.stream.Collectors;
 
 public class RaftExamplesTestUtil {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftExamplesTestUtil.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 473a5c6..22b295c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.grpc;
 
 import com.google.common.base.Preconditions;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.RpcType;
 import org.apache.ratis.grpc.client.RaftClientProtocolService;
 import org.apache.ratis.grpc.server.RaftServerProtocolClient;
@@ -51,28 +50,7 @@ public class RaftGRpcService implements RaftServerRpc {
       RaftGRpcService.class.getSimpleName() + ".sendRequest";
 
   public static class Builder extends 
RaftServerRpc.Builder<Builder,RaftGRpcService> {
-    private int maxMessageSize = RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT;
-
-    private Builder() {
-      super(RAFT_GRPC_SERVER_PORT_DEFAULT);
-    }
-
-    public int getMaxMessageSize() {
-      return maxMessageSize;
-    }
-
-    public Builder setMaxMessageSize(int maxMessageSize) {
-      this.maxMessageSize = maxMessageSize;
-      return this;
-    }
-
-    public Builder setFromRaftProperties(RaftProperties properties) {
-      setPort(properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
-          RAFT_GRPC_SERVER_PORT_DEFAULT));
-      setMaxMessageSize(properties.getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
-          RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT));
-      return this;
-    }
+    private Builder() {}
 
     @Override
     public Builder getThis() {
@@ -81,7 +59,7 @@ public class RaftGRpcService implements RaftServerRpc {
 
     @Override
     public RaftGRpcService build() {
-      return new RaftGRpcService(getServer(), getPort(), getMaxMessageSize());
+      return new RaftGRpcService(getServer());
     }
   }
 
@@ -95,6 +73,13 @@ public class RaftGRpcService implements RaftServerRpc {
       Collections.synchronizedMap(new HashMap<>());
   private final RaftPeerId selfId;
 
+  private RaftGRpcService(RaftServer server) {
+    this(server,
+        server.getProperties().getInt(RAFT_GRPC_SERVER_PORT_KEY,
+            RAFT_GRPC_SERVER_PORT_DEFAULT),
+        server.getProperties().getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
+            RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT));
+  }
   private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) 
{
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
     selfId = raftServer.getId();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
index 09e3265..e280faf 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
@@ -17,11 +17,9 @@
  */
 package org.apache.ratis.grpc.server;
 
-import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.ServerFactory;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.grpc.RaftGRpcService;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.*;
 
 public class GrpcServerFactory implements ServerFactory {
   @Override
@@ -29,4 +27,11 @@ public class GrpcServerFactory implements ServerFactory {
                                     FollowerInfo f) {
     return new GRpcLogAppender(server, state, f);
   }
+
+  @Override
+  public RaftServerRpc newRaftServerRpc(RaftServerImpl server) {
+    return RaftGRpcService.newBuilder()
+        .setServer(server)
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index bab8ee8..c0c4f6d 100644
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -17,9 +17,10 @@
  */
 package org.apache.ratis.grpc;
 
-import com.google.common.base.Preconditions;
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
@@ -28,13 +29,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.util.NetUtils;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithGRpc> FACTORY
@@ -42,6 +38,7 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
     @Override
     public MiniRaftClusterWithGRpc newCluster(
         String[] ids, RaftProperties prop, boolean formatted) {
+      RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.GRPC);
       return new MiniRaftClusterWithGRpc(ids, prop, formatted);
     }
   };
@@ -51,22 +48,15 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
 
   private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
       boolean formatted) {
-    super(ids, new RaftProperties(properties), formatted);
-    init(initRpcServices(getServers(), properties));
+    super(ids, properties, formatted);
   }
 
-  private static Map<RaftPeer, RaftGRpcService> initRpcServices(
-      Collection<RaftServerImpl> servers, RaftProperties prop) {
-    final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();
-
-    for (RaftServerImpl s : servers) {
-      final RaftGRpcService rpc = RaftGRpcService.newBuilder()
-          .setFromRaftProperties(prop)
-          .setServer(s)
-          .build();
-      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
-    }
-    return peerRpcs;
+  @Override
+  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+    final RaftServerImpl s = super.newRaftServer(id, format);
+    s.getProperties().setInt(
+        RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(s));
+    return s;
   }
 
   @Override
@@ -75,40 +65,18 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-                                             Collection<RaftServerImpl> 
newServers, boolean startService)
-      throws IOException {
-    final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, 
properties);
-    for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) {
-      RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setServerRpc(entry.getValue());
+  protected Collection<RaftPeer> addNewPeers(
+      Collection<RaftServerImpl> newServers, boolean startService) {
+    final Collection<RaftPeer> peers = toRaftPeers(newServers);
+    for (RaftPeer p: peers) {
+      final RaftServerImpl server = servers.get(p.getId());
       if (!startService) {
         
BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString());
       } else {
         server.start();
       }
     }
-    return new ArrayList<>(peers.keySet());
-  }
-
-  @Override
-  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
-    RaftServerImpl server = servers.get(peer.getId());
-    int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort();
-    int oldPort = 
properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY,
-        RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT);
-    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port);
-    final RaftGRpcService rpc = RaftGRpcService.newBuilder()
-        .setFromRaftProperties(properties)
-        .setServer(server)
-        .build();
-    Preconditions.checkState(
-        rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
-        "address in the raft conf: %s, address in rpc server: %s",
-        peer.getAddress(), rpc.getInetSocketAddress().toString());
-    server.setServerRpc(rpc);
-    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort);
-    return server;
+    return peers;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
new file mode 100644
index 0000000..a083f05
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.hadooprpc.server.HadoopRpcService;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerFactory;
+
+public class HadoopFactory extends ServerFactory.BaseFactory {
+  private Configuration conf;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public RaftServerRpc newRaftServerRpc(RaftServerImpl server) {
+    return HadoopRpcService.newBuilder()
+        .setServer(server)
+        .setConf(conf)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
index 4cdbdb3..bfdf05b 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
@@ -50,7 +50,7 @@ public interface HadoopRpcServerConfigKeys {
     public static void setAddress(
         BiConsumer<String, String> setString,
         String address) {
-      ConfUtils.setString(setString, ADDRESS_KEY, address);
+      ConfUtils.set(setString, ADDRESS_KEY, address);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index e05e2a8..74716be 100644
--- 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -53,9 +53,7 @@ public class HadoopRpcService implements RaftServerRpc {
   public static class Builder extends RaftServerRpc.Builder<Builder, 
HadoopRpcService> {
     private Configuration conf;
 
-    private Builder() {
-      super(0);
-    }
+    private Builder() {}
 
     public Configuration getConf() {
       if (conf == null) {
@@ -117,7 +115,8 @@ public class HadoopRpcService implements RaftServerRpc {
     return ipcServerAddress;
   }
 
-  private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final 
Configuration conf)
+  private static RPC.Server newRpcServer(
+      RaftServerProtocol serverProtocol, final Configuration conf)
       throws IOException {
     final int handlerCount = 
HadoopRpcServerConfigKeys.Ipc.handlers(conf::getInt);
     final InetSocketAddress address = 
HadoopRpcServerConfigKeys.Ipc.address(conf::getTrimmed);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index f3ee3f6..157891f 100644
--- 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -17,26 +17,22 @@
  */
 package org.apache.ratis.hadooprpc;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender;
 import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
   static final Logger LOG = 
LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class);
 
@@ -55,6 +51,7 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
 
     public MiniRaftClusterWithHadoopRpc newCluster(
         String[] ids, RaftProperties prop, Configuration conf, boolean 
formatted) {
+      RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.HADOOP);
       HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
       return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted);
     }
@@ -67,53 +64,27 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
 
   private final Configuration hadoopConf;
 
-
   private MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties,
       Configuration hadoopConf, boolean formatted) {
     super(ids, properties, formatted);
     this.hadoopConf = hadoopConf;
-
-    init(initRpcServices(getServers(), hadoopConf));
-  }
-
-  private static Map<RaftPeer, HadoopRpcService> initRpcServices(
-      Collection<RaftServerImpl> servers, Configuration hadoopConf) {
-    final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>();
-
-    for(RaftServerImpl s : servers) {
-      final HadoopRpcService rpc = HadoopRpcService.newBuilder()
-          .setServer(s)
-          .setConf(hadoopConf)
-          .build();
-      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
-    }
-    return peerRpcs;
+    getServers().stream().forEach(s -> setConf(s));
   }
 
-  @Override
-  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
-    Configuration hconf = new Configuration(hadoopConf);
-    HadoopRpcServerConfigKeys.Ipc.setAddress(hconf::set, peer.getAddress());
-
-    RaftServerImpl server = servers.get(peer.getId());
-    final HadoopRpcService rpc = HadoopRpcService.newBuilder()
-        .setServer(server)
-        .setConf(hconf)
-        .build();
-    Preconditions.checkState(
-        rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
-        "address in the raft conf: %s, address in rpc server: %s",
-        peer.getAddress(), rpc.getInetSocketAddress());
-    server.setServerRpc(rpc);
-    return server;
+  private void setConf(RaftServerImpl server) {
+    final Configuration conf = new Configuration(hadoopConf);
+    final String address = "0.0.0.0:" + getPort(server);
+    HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, address);
+    ((HadoopFactory)server.getFactory()).setConf(conf);
   }
 
   @Override
-  public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-                                          Collection<RaftServerImpl> 
newServers, boolean startService)
-      throws IOException {
-    return addNewPeers(initRpcServices(newServers, hadoopConf),
-        newServers, startService);
+  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+    final RaftServerImpl s = super.newRaftServer(id, format);
+    if (hadoopConf != null) {
+      setConf(s);
+    }
+    return s;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
new file mode 100644
index 0000000..07d131c
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.conf.ConfUtils;
+
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+public interface NettyConfigKeys {
+  String PREFIX = "raft.netty";
+
+  abstract class Server {
+    static String PREFIX = NettyConfigKeys.PREFIX + ".server";
+
+    public static String PORT_KEY = PREFIX + ".port";
+    public static int PORT_DEFAULT = 0;
+
+    public static int port(BiFunction<String, Integer, Integer> getInt) {
+      return ConfUtils.getInt(getInt, PORT_KEY, PORT_DEFAULT, 0, 65536);
+    }
+
+    public static void setPort(BiConsumer<String, Integer> setString, int 
port) {
+      ConfUtils.setInt(setString, PORT_KEY, port);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
new file mode 100644
index 0000000..6265f02
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.netty.server.NettyRpcService;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerFactory;
+
+public class NettyFactory extends ServerFactory.BaseFactory {
+  @Override
+  public RaftServerRpc newRaftServerRpc(RaftServerImpl server) {
+    return NettyRpcService.newBuilder().setServer(server).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 5a2bac5..140cbb1 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -26,6 +26,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.RpcType;
 import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
@@ -58,9 +59,7 @@ public final class NettyRpcService implements RaftServerRpc {
   public static final String SEND_SERVER_REQUEST = CLASS_NAME + 
".sendServerRequest";
 
   public static class Builder extends RaftServerRpc.Builder<Builder, 
NettyRpcService> {
-    private Builder() {
-      super(0);
-    }
+    private Builder() {}
 
     @Override
     public Builder getThis() {
@@ -69,7 +68,7 @@ public final class NettyRpcService implements RaftServerRpc {
 
     @Override
     public NettyRpcService build() {
-      return new NettyRpcService(getServer(), getPort());
+      return new NettyRpcService(getServer());
     }
   }
 
@@ -98,7 +97,7 @@ public final class NettyRpcService implements RaftServerRpc {
   }
 
   /** Constructs a netty server with the given port. */
-  private NettyRpcService(RaftServer server, int port) {
+  private NettyRpcService(RaftServer server) {
     this.server = server;
     this.id = server.getId();
 
@@ -117,6 +116,7 @@ public final class NettyRpcService implements RaftServerRpc 
{
       }
     };
 
+    final int port = 
NettyConfigKeys.Server.port(server.getProperties()::getInt);
     channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 4ddab6e..7167d0d 100644
--- 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -18,22 +18,16 @@
 package org.apache.ratis.netty;
 
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.netty.client.NettyClientRequestSender;
 import org.apache.ratis.netty.server.NettyRpcService;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.util.NetUtils;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithNetty> FACTORY
@@ -41,6 +35,7 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
     @Override
     public MiniRaftClusterWithNetty newCluster(
         String[] ids, RaftProperties prop, boolean formatted) {
+      RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.NETTY);
       return new MiniRaftClusterWithNetty(ids, prop, formatted);
     }
   };
@@ -51,56 +46,16 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   private MiniRaftClusterWithNetty(
       String[] ids, RaftProperties properties, boolean formatted) {
     super(ids, properties, formatted);
-    init(initRpcServices(getServers(), getConf()));
-  }
-
-  private static String getAddress(RaftPeerId id, RaftConfiguration conf) {
-    final RaftPeer peer = conf.getPeer(id);
-    if (peer != null) {
-      final String address = peer.getAddress();
-      if (address != null) {
-        return address;
-      }
-    }
-    return "0.0.0.0:0";
-  }
-
-  private static NettyRpcService newNettyRpcService(
-      RaftServerImpl s, RaftConfiguration conf) {
-    final String address = getAddress(s.getId(), conf);
-    final int port = NetUtils.newInetSocketAddress(address).getPort();
-    return NettyRpcService.newBuilder().setServer(s).setPort(port).build();
-  }
-
-  private static Map<RaftPeer, NettyRpcService> initRpcServices(
-      Collection<RaftServerImpl> servers, RaftConfiguration conf) {
-    final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>();
-
-    for (RaftServerImpl s : servers) {
-      final NettyRpcService rpc = newNettyRpcService(s, conf);
-      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
-    }
-
-    return peerRpcs;
   }
 
   @Override
-  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
-    final RaftServerImpl s = servers.get(peer.getId());
-    final NettyRpcService rpc = newNettyRpcService(s, conf);
-    s.setServerRpc(rpc);
+  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+    final RaftServerImpl s = super.newRaftServer(id, format);
+    NettyConfigKeys.Server.setPort(s.getProperties()::setInt, getPort(s));
     return s;
   }
 
   @Override
-  protected Collection<RaftPeer> addNewPeers(
-      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
-      boolean startService) throws IOException {
-    return addNewPeers(initRpcServices(newServers, conf),
-        newServers, startService);
-  }
-
-  @Override
   public RaftClientRequestSender getRaftClientRequestSender() {
     return new NettyClientRequestSender(getPeers());
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index bfbd75f..3eed3c1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -18,11 +18,13 @@
 package org.apache.ratis.server;
 
 import com.google.common.base.Preconditions;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
 import org.apache.ratis.protocol.RaftClientProtocol;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.statemachine.StateMachine;
@@ -31,13 +33,16 @@ import java.io.Closeable;
 import java.io.IOException;
 
 /** Raft server interface */
-public interface RaftServer extends Closeable, RaftServerProtocol,
+public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
   /** @return the server ID. */
   RaftPeerId getId();
 
-  /** Set server RPC service. */
-  void setServerRpc(RaftServerRpc serverRpc);
+  /** @return the server properties. */
+  RaftProperties getProperties();
+
+  /** @return the factory for creating server components. */
+  ServerFactory getFactory();
 
   /** Start this server. */
   void start();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 8e6d54f..1203cdd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -18,21 +18,16 @@
 package org.apache.ratis.server;
 
 import org.apache.ratis.RpcType;
-import org.apache.ratis.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
 
 public interface RaftServerConfigKeys {
 
   String PREFIX = "raft.server";
 
   enum Factory {
-    NETTY("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
+    NETTY("org.apache.ratis.netty.NettyFactory"),
     GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"),
-    HADOOP("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
-    SIMULATED("org.apache.ratis.server.impl.ServerFactory$BaseFactory");
+    HADOOP("org.apache.ratis.hadooprpc.HadoopFactory"),
+    SIMULATED("org.apache.ratis.server.simulation.SimulationFactory");
 
     public static String getKey(String rpcType) {
       return RaftServerConfigKeys.PREFIX + ".factory." + rpcType + ".class";

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index e68c536..40a8363 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -22,8 +22,8 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Objects;
 
 /**
  * An server-side interface for supporting different RPC implementations
@@ -33,15 +33,10 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, Closeabl
   /** To build {@link RaftServerRpc} objects. */
   abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
     private RaftServer server;
-    private int port;
-
-    /** Construct a builder with the default port. */
-    protected Builder(int defaultPort) {
-      this.port = defaultPort;
-    }
 
     public RaftServer getServer() {
-      return server;
+      return Objects.requireNonNull(server,
+          "The 'server' field is not initialized.");
     }
 
     public B setServer(RaftServer server) {
@@ -49,19 +44,9 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, Closeabl
       return getThis();
     }
 
-    public int getPort() {
-      return port;
-    }
-
-    /** Set the port for the server to listen to. */
-    public B setPort(int port) {
-      this.port = port;
-      return getThis();
-    }
-
     protected abstract B getThis();
 
-    public abstract RPC build() throws IOException;
+    public abstract RPC build();
   }
 
   /** Start the RPC service. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index 7769de1..c000238 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -29,7 +29,7 @@ import java.util.*;
  * entries.
  */
 public class ConfigurationManager {
-  private RaftConfiguration initialConf;
+  private final RaftConfiguration initialConf;
   private final NavigableMap<Long, RaftConfiguration> configurations =
       new TreeMap<>();
   /**
@@ -39,15 +39,6 @@ public class ConfigurationManager {
   private RaftConfiguration currentConf;
 
   ConfigurationManager(RaftConfiguration initialConf) {
-    setInitialConf(initialConf);
-  }
-
-  @VisibleForTesting
-  public synchronized void setInitialConf(RaftConfiguration initialConf) {
-    /**
-     * initialConf should actually be defined as "final". But for tests we want
-     * to change the initial configuration to reflect the correct port binding.
-     */
     this.initialConf = initialConf;
     this.currentConf = initialConf;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 152d6a5..c5fe336 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,44 +17,18 @@
  */
 package org.apache.ratis.server.impl;
 
-import static org.apache.ratis.util.LifeCycle.State.CLOSED;
-import static org.apache.ratis.util.LifeCycle.State.CLOSING;
-import static org.apache.ratis.util.LifeCycle.State.RUNNING;
-import static org.apache.ratis.util.LifeCycle.State.STARTING;
-
-import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.LeaderNotReadyException;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftException;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.ReconfigurationInProgressException;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -65,8 +39,18 @@ import org.apache.ratis.util.RaftUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
+import static org.apache.ratis.util.LifeCycle.State.*;
 
 public class RaftServerImpl implements RaftServer {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerImpl.class);
@@ -82,6 +66,7 @@ public class RaftServerImpl implements RaftServer {
     LEADER, CANDIDATE, FOLLOWER
   }
 
+  private final RpcType rpcType;
   private final int minTimeoutMs;
   private final int maxTimeoutMs;
 
@@ -100,14 +85,15 @@ public class RaftServerImpl implements RaftServer {
   /** used when the peer is leader */
   private volatile LeaderState leaderState;
 
-  private RaftServerRpc serverRpc;
+  private final Supplier<RaftServerRpc> serverRpc;
 
-  private final Supplier<ServerFactory> factory ;
+  private final ServerFactory factory;
 
   RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
                  RaftConfiguration raftConf, RaftProperties properties)
       throws IOException {
     this.lifeCycle = new LifeCycle(id);
+    this.rpcType = RaftConfigKeys.Rpc.type(properties::getEnum);
     minTimeoutMs = properties.getInt(
         RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
         RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
@@ -119,12 +105,18 @@ public class RaftServerImpl implements RaftServer {
     this.properties = properties;
     this.stateMachine = stateMachine;
     this.state = new ServerState(id, raftConf, properties, this, stateMachine);
-    this.factory = RaftUtils.memoize(
-        () -> ServerFactory.Util.newServerFactory(getServerRpc().getRpcType(), 
properties));
+    this.factory = ServerFactory.Util.newServerFactory(rpcType, properties);
+    this.serverRpc = RaftUtils.memoize(() -> initRaftServerRpc());
   }
 
-  ServerFactory getFactory() {
-    return factory.get();
+  @Override
+  public RpcType getRpcType() {
+    return rpcType;
+  }
+
+  @Override
+  public ServerFactory getFactory() {
+    return factory;
   }
 
   int getMinTimeoutMs() {
@@ -144,26 +136,18 @@ public class RaftServerImpl implements RaftServer {
     return this.stateMachine;
   }
 
-  /**
-   * Used by tests to set initial raft configuration with correct port 
bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration conf) {
-    this.state.setInitialConf(conf);
-  }
-
-  @Override
-  public void setServerRpc(RaftServerRpc serverRpc) {
-    this.serverRpc = serverRpc;
+  private RaftServerRpc initRaftServerRpc() {
+    final RaftServerRpc rpc = getFactory().newRaftServerRpc(this);
     // add peers into rpc service
     RaftConfiguration conf = getRaftConf();
     if (conf != null) {
-      serverRpc.addPeers(conf.getPeers());
+      rpc.addPeers(conf.getPeers());
     }
+    return rpc;
   }
 
   public RaftServerRpc getServerRpc() {
-    return serverRpc;
+    return serverRpc.get();
   }
 
   @Override
@@ -188,7 +172,7 @@ public class RaftServerImpl implements RaftServer {
     heartbeatMonitor = new FollowerState(this);
     heartbeatMonitor.start();
 
-    serverRpc.start();
+    getServerRpc().start();
     lifeCycle.transition(RUNNING);
   }
 
@@ -200,7 +184,7 @@ public class RaftServerImpl implements RaftServer {
   private void startInitializing() {
     role = Role.FOLLOWER;
     // do not start heartbeatMonitoring
-    serverRpc.start();
+    getServerRpc().start();
   }
 
   public ServerState getState() {
@@ -224,7 +208,7 @@ public class RaftServerImpl implements RaftServer {
         shutdownElectionDaemon();
         shutdownLeaderState();
 
-        serverRpc.close();
+        getServerRpc().close();
         state.close();
       } catch (Exception ignored) {
         LOG.warn("Failed to kill " + state.getSelfId(), ignored);
@@ -831,6 +815,7 @@ public class RaftServerImpl implements RaftServer {
     return null;
   }
 
+  @Override
   public RaftProperties getProperties() {
     return this.properties;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
index 38caba7..2b7fb77 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -19,9 +19,12 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.RpcType;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.util.RaftUtils;
 
+import java.io.IOException;
 import java.util.Objects;
 
 /** A factory interface for creating server components. */
@@ -29,7 +32,9 @@ public interface ServerFactory {
   /** Create a new {@link LogAppender}. */
   LogAppender newLogAppender(RaftServerImpl server, LeaderState state, 
FollowerInfo f);
 
-  class BaseFactory implements ServerFactory {
+  RaftServerRpc newRaftServerRpc(RaftServerImpl server);
+
+  abstract class BaseFactory implements ServerFactory {
     @Override
     public LogAppender newLogAppender(
         RaftServerImpl server, LeaderState state, FollowerInfo f) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 150b2c1..dd2d784 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -100,14 +100,6 @@ public class ServerState implements Closeable {
          lastApplied, prop);
   }
 
-  /**
-   * Used by tests to set initial raft configuration with correct port 
bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration initialConf) {
-    configurationManager.setInitialConf(initialConf);
-  }
-
   private long initStatemachine(StateMachine sm, RaftProperties properties)
       throws IOException {
     sm.initialize(selfId, properties, storage);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index ce3cafc..afd6eca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -24,18 +24,14 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.RaftConfiguration;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.server.impl.*;
 import org.apache.ratis.server.storage.MemoryRaftLog;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.statemachine.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.RaftUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -43,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -73,25 +70,30 @@ public abstract class MiniRaftCluster {
       super(ids, properties, formatted);
     }
 
-    protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws 
IOException;
-
-    @Override
-    protected void setPeerRpc() throws IOException {
-      for (RaftPeer p : conf.getPeers()) {
-        setPeerRpc(p);
-      }
-    }
-
     @Override
     public void restartServer(String id, boolean format) throws IOException {
       super.restartServer(id, format);
-      setPeerRpc(conf.getPeer(new RaftPeerId(id))).start();
+      getServer(id).start();
     }
 
     @Override
     public void setBlockRequestsFrom(String src, boolean block) {
       RaftTestUtil.setBlockRequestsFrom(src, block);
     }
+
+    public static int getPort(RaftServerImpl server) {
+      final int port = getPort(server.getId(), 
server.getState().getRaftConf());
+      LOG.info(server.getId() + "(" + server.getRpcType() + "), port=" + port);
+      return port;
+    }
+
+    public static int getPort(RaftPeerId id, RaftConfiguration conf) {
+      final RaftPeer peer = conf.getPeer(id);
+      final String address = peer != null? peer.getAddress(): null;
+      final InetSocketAddress inetAddress = address != null?
+          NetUtils.createSocketAddr(address): 
NetUtils.createLocalServerAddress();
+      return inetAddress.getPort();
+    }
   }
 
   public static class PeerChanges {
@@ -112,7 +114,9 @@ public abstract class MiniRaftCluster {
 
   public static RaftConfiguration initConfiguration(String[] ids) {
     return RaftConfiguration.newBuilder()
-        .setConf(Arrays.stream(ids).map(id -> new RaftPeer(new RaftPeerId(id)))
+        .setConf(Arrays.stream(ids)
+            .map(id -> new RaftPeerId(id))
+            .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
             .collect(Collectors.toList()))
         .build();
   }
@@ -154,16 +158,6 @@ public abstract class MiniRaftCluster {
     ExitUtils.disableSystemExit();
   }
 
-  protected <RPC extends  RaftServerRpc> void init(Map<RaftPeer, RPC> peers) {
-    LOG.info("peers = " + peers.keySet());
-    conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build();
-    for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) {
-      final RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setInitialConf(conf);
-      server.setServerRpc(entry.getValue());
-    }
-  }
-
   public void start() {
     LOG.info("Starting " + getClass().getSimpleName());
     servers.values().forEach(RaftServerImpl::start);
@@ -188,11 +182,11 @@ public abstract class MiniRaftCluster {
       servers.put(id, newRaftServer(id, format));
     }
 
-    setPeerRpc();
+    initRpc();
     start();
   }
 
-  protected abstract void setPeerRpc() throws IOException;
+  protected void initRpc() {}
 
   public int getMaxTimeout() {
     return properties.getInt(
@@ -204,20 +198,19 @@ public abstract class MiniRaftCluster {
     return conf;
   }
 
-  private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
-    final RaftServerImpl s;
+  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
     try {
       final String dirStr = testBaseDir + id;
       if (format) {
         formatDir(dirStr);
       }
-      properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
+      final RaftProperties prop = new RaftProperties(properties);
+      prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
       final StateMachine stateMachine = getStateMachine4Test(properties);
-      s = ServerImplUtils.newRaftServer(id, stateMachine, conf, properties);
+      return ServerImplUtils.newRaftServer(id, stateMachine, conf, prop);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    return s;
   }
 
   static StateMachine getStateMachine4Test(RaftProperties properties) {
@@ -230,23 +223,22 @@ public abstract class MiniRaftCluster {
 
   public abstract RaftClientRequestSender getRaftClientRequestSender();
 
-  protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers(
-      Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers,
-      boolean startService) throws IOException {
-    for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) {
-      RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setServerRpc(entry.getValue());
-    }
+  public static Collection<RaftPeer> toRaftPeers(
+      Collection<RaftServerImpl> servers) {
+    return servers.stream()
+        .map(s -> new RaftPeer(s.getId(), 
s.getServerRpc().getInetSocketAddress()))
+        .collect(Collectors.toList());
+  }
+
+  protected Collection<RaftPeer> addNewPeers(
+      Collection<RaftServerImpl> newServers, boolean startService) {
+    final Collection<RaftPeer> peers = toRaftPeers(newServers);
     if (startService) {
       newServers.forEach(RaftServerImpl::start);
     }
-    return new ArrayList<>(newPeers.keySet());
+    return peers;
   }
 
-  protected abstract Collection<RaftPeer> addNewPeers(
-      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
-      boolean startService) throws IOException;
-
   public PeerChanges addNewPeers(int number, boolean startNewPeer)
       throws IOException {
     return addNewPeers(generateIds(number, servers.size()), startNewPeer);
@@ -255,22 +247,19 @@ public abstract class MiniRaftCluster {
   public PeerChanges addNewPeers(String[] ids,
       boolean startNewPeer) throws IOException {
     LOG.info("Add new peers {}", Arrays.asList(ids));
-    Collection<RaftPeer> newPeers = new ArrayList<>(ids.length);
-    for (String id : ids) {
-      newPeers.add(new RaftPeer(new RaftPeerId(id)));
-    }
 
     // create and add new RaftServers
     final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
-    for (RaftPeer p : newPeers) {
-      RaftServerImpl newServer = newRaftServer(p.getId(), true);
-      Preconditions.checkArgument(!servers.containsKey(p.getId()));
-      servers.put(p.getId(), newServer);
+    for (String id : ids) {
+      Preconditions.checkArgument(!servers.containsKey(id));
+
+      final RaftPeerId peerId = new RaftPeerId(id);
+      final RaftServerImpl newServer = newRaftServer(peerId, true);
+      servers.put(peerId, newServer);
       newServers.add(newServer);
     }
 
-    // for hadoop-rpc-enabled peer, we assign inetsocketaddress here
-    newPeers = addNewPeers(newPeers, newServers, startNewPeer);
+    final Collection<RaftPeer> newPeers = addNewPeers(newServers, 
startNewPeer);
 
     final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
     newPeers.addAll(conf.getPeers());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 1745321..121159b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -18,6 +18,8 @@
 package org.apache.ratis.server.simulation;
 
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
@@ -39,6 +41,7 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
     @Override
     public MiniRaftClusterWithSimulatedRpc newCluster(
         String[] ids, RaftProperties prop, boolean formatted) {
+      RaftConfigKeys.Rpc.setType(prop::setEnum, RpcType.SIMULATED);
       if (ThreadLocalRandom.current().nextBoolean()) {
         // turn off simulate latency half of the times.
         prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
@@ -56,28 +59,24 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
     initRpc();
   }
 
-  private void initRpc() {
-    final Collection<RaftPeer> peers = getConf().getPeers();
+  @Override
+  protected void initRpc() {
     final int simulateLatencyMs = properties.getInt(
         SimulatedRequestReply.SIMULATE_LATENCY_KEY,
         SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT);
     LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = "
         + simulateLatencyMs);
-    serverRequestReply = new SimulatedRequestReply<>(peers, simulateLatencyMs);
-    client2serverRequestReply = new SimulatedClientRequestReply(peers,
-        simulateLatencyMs);
-
-    setRpcServers(getServers());
+    serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs);
+    client2serverRequestReply = new 
SimulatedClientRequestReply(simulateLatencyMs);
+    getServers().stream().forEach(s -> initRpc(s));
+    addPeersToRpc(toRaftPeers(getServers()));
   }
 
-  private void setRpcServers(Collection<RaftServerImpl> newServers) {
-    newServers.forEach(s -> s.setServerRpc(
-        new SimulatedServerRpc(s, serverRequestReply, 
client2serverRequestReply)));
-  }
-
-  @Override
-  protected void setPeerRpc() {
-    initRpc();
+  private void initRpc(RaftServerImpl s) {
+    if (serverRequestReply != null) {
+      ((SimulationFactory)s.getFactory()).initRpc(
+          serverRequestReply, client2serverRequestReply);
+    }
   }
 
   private void addPeersToRpc(Collection<RaftPeer> peers) {
@@ -86,20 +85,25 @@ public class MiniRaftClusterWithSimulatedRpc extends 
MiniRaftCluster {
   }
 
   @Override
+  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+    final RaftServerImpl s = super.newRaftServer(id, format);
+    initRpc(s);
+    return s;
+  }
+
+  @Override
   public void restartServer(String id, boolean format) throws IOException {
     super.restartServer(id, format);
     RaftServerImpl s = getServer(id);
     addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id))));
-    s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply,
-        client2serverRequestReply));
     s.start();
   }
 
   @Override
-  public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-                                          Collection<RaftServerImpl> 
newServers, boolean startService) {
+  public Collection<RaftPeer> addNewPeers(
+      Collection<RaftServerImpl> newServers, boolean startService) {
+    final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
     addPeersToRpc(newPeers);
-    setRpcServers(newServers);
     if (startService) {
       newServers.forEach(RaftServerImpl::start);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
index 9302051..65fe7ad 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRequestReply.java
@@ -22,15 +22,11 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 
-import java.io.IOException;
-import java.util.Collection;
-
-public class SimulatedClientRequestReply
+class SimulatedClientRequestReply
     extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
     implements RaftClientRequestSender {
-  SimulatedClientRequestReply(Collection<RaftPeer> allPeers,
-                              int simulateLatencyMs) {
-    super(allPeers, simulateLatencyMs);
+  SimulatedClientRequestReply(int simulateLatencyMs) {
+    super(simulateLatencyMs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
index f11c626..95d3efa 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server.simulation;
 
 import com.google.common.base.Preconditions;
-
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftRpcMessage;
@@ -27,7 +26,6 @@ import org.apache.ratis.util.RaftUtils;
 import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -37,7 +35,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class SimulatedRequestReply<REQUEST extends RaftRpcMessage,
+class SimulatedRequestReply<REQUEST extends RaftRpcMessage,
     REPLY extends RaftRpcMessage> {
   public static final String SIMULATE_LATENCY_KEY
       = SimulatedRequestReply.class.getName() + ".simulateLatencyMs";
@@ -105,15 +103,11 @@ public class SimulatedRequestReply<REQUEST extends 
RaftRpcMessage,
     }
   }
 
-  private final Map<String, EventQueue<REQUEST, REPLY>> queues;
+  private final Map<String, EventQueue<REQUEST, REPLY>> queues
+      = new ConcurrentHashMap<>();
   private final int simulateLatencyMs;
 
-  SimulatedRequestReply(Collection<RaftPeer> allPeers, int simulateLatencyMs) {
-    queues = new ConcurrentHashMap<>();
-    for (RaftPeer peer : allPeers) {
-      queues.put(peer.getId().toString(), new EventQueue<>());
-    }
-
+  SimulatedRequestReply(int simulateLatencyMs) {
     this.simulateLatencyMs = simulateLatencyMs;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d7fb566c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java
new file mode 100644
index 0000000..d0b3d38
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulationFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerFactory;
+
+import java.util.Objects;
+
+public class SimulationFactory extends ServerFactory.BaseFactory {
+  private SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply;
+  private SimulatedClientRequestReply client2serverRequestReply;
+
+  public void initRpc(
+      SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
+      SimulatedClientRequestReply client2serverRequestReply) {
+    this.serverRequestReply = Objects.requireNonNull(serverRequestReply);
+    this.client2serverRequestReply = 
Objects.requireNonNull(client2serverRequestReply);
+  }
+
+  @Override
+  public RaftServerRpc newRaftServerRpc(RaftServerImpl server) {
+    return new SimulatedServerRpc(server, serverRequestReply, 
client2serverRequestReply);
+  }
+}

Reply via email to