Rename RaftServer to RaftServerImpl (will add an interface.)

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/22f3ee0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/22f3ee0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/22f3ee0b

Branch: refs/heads/master
Commit: 22f3ee0b46a5485deafa84939eb5290938b5552b
Parents: c36810e
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Mon Jan 2 20:10:28 2017 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Mon Jan 2 20:10:28 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/raft/TestBatchAppend.java   |   4 +-
 .../org/apache/raft/TestRestartRaftPeer.java    |   4 +-
 .../TestRaftStateMachineException.java          |   4 +-
 .../org/apache/raft/grpc/RaftGRpcService.java   |   4 +-
 .../raft/grpc/server/GRpcLogAppender.java       |   6 +-
 .../server/PipelinedLogAppenderFactory.java     |   6 +-
 .../raft/grpc/MiniRaftClusterWithGRpc.java      |  14 +-
 .../org/apache/raft/grpc/TestRaftStream.java    |  12 +-
 .../org/apache/raft/grpc/TestRaftWithGrpc.java  |   4 +-
 .../raft/hadooprpc/server/HadoopRpcService.java |   4 +-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |  12 +-
 .../raft/hadooprpc/TestRaftWithHadoopRpc.java   |   4 +-
 .../raft/netty/server/NettyRpcService.java      |   4 +-
 .../raft/netty/MiniRaftClusterWithNetty.java    |  14 +-
 .../apache/raft/netty/TestRaftWithNetty.java    |   4 +-
 .../apache/raft/server/impl/FollowerState.java  |   6 +-
 .../apache/raft/server/impl/LeaderElection.java |   4 +-
 .../apache/raft/server/impl/LeaderState.java    |   6 +-
 .../apache/raft/server/impl/LogAppender.java    |   6 +-
 .../raft/server/impl/LogAppenderFactory.java    |   8 +-
 .../raft/server/impl/PendingRequests.java       |   6 +-
 .../org/apache/raft/server/impl/RaftServer.java | 749 -------------------
 .../apache/raft/server/impl/RaftServerImpl.java | 749 +++++++++++++++++++
 .../raft/server/impl/RequestDispatcher.java     |   4 +-
 .../apache/raft/server/impl/ServerState.java    |  10 +-
 .../raft/server/impl/StateMachineUpdater.java   |   4 +-
 .../raft/server/storage/RaftLogWorker.java      |   8 +-
 .../raft/server/storage/SegmentedRaftLog.java   |   6 +-
 .../java/org/apache/raft/MiniRaftCluster.java   |  60 +-
 .../java/org/apache/raft/RaftBasicTests.java    |   8 +-
 .../raft/RaftNotLeaderExceptionBaseTest.java    |   4 +-
 .../test/java/org/apache/raft/RaftTestUtil.java |  22 +-
 .../impl/BlockRequestHandlingInjection.java     |   6 +-
 .../impl/RaftReconfigurationBaseTest.java       |   4 +-
 .../raft/server/impl/RaftServerTestUtil.java    |   2 +-
 .../MiniRaftClusterWithSimulatedRpc.java        |  10 +-
 .../server/simulation/SimulatedServerRpc.java   |   6 +-
 .../simulation/TestRaftWithSimulatedRpc.java    |   4 +-
 .../raft/statemachine/RaftSnapshotBaseTest.java |   4 +-
 .../raft/statemachine/TestStateMachine.java     |   8 +-
 40 files changed, 902 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
----------------------------------------------------------------------
diff --git a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java 
b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
index c730245..1fd8ba0 100644
--- a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
+++ b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
@@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.statemachine.SimpleStateMachine4Testing;
 import org.apache.raft.statemachine.StateMachine;
@@ -55,7 +55,7 @@ import static org.apache.raft.server.RaftServerConfigKeys.*;
 public class TestBatchAppend {
   static Logger LOG = LoggerFactory.getLogger(TestBatchAppend.class);
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java 
b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
index fadd7a8..e04d141 100644
--- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
+++ b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
@@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;
@@ -49,7 +49,7 @@ import java.util.Collection;
 public class TestRestartRaftPeer {
   static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class);
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
 
b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
index 2fa6e53..0832579 100644
--- 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
+++ 
b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
@@ -24,7 +24,7 @@ import org.apache.raft.client.RaftClient;
 import org.apache.raft.examples.RaftExamplesTestUtil;
 import org.apache.raft.protocol.Message;
 import org.apache.raft.protocol.StateMachineException;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;
 import org.apache.raft.util.RaftUtils;
@@ -42,7 +42,7 @@ import static org.junit.Assert.fail;
 @RunWith(Parameterized.class)
 public class TestRaftStateMachineException {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
index c9a0daf..d465724 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
@@ -23,7 +23,7 @@ import org.apache.raft.grpc.client.RaftClientProtocolService;
 import org.apache.raft.grpc.server.RaftServerProtocolClient;
 import org.apache.raft.grpc.server.RaftServerProtocolService;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerRpc;
 import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.Server;
@@ -54,7 +54,7 @@ public class RaftGRpcService implements RaftServerRpc {
       Collections.synchronizedMap(new HashMap<>());
   private final String selfId;
 
-  public RaftGRpcService(RaftServer raftServer, RaftProperties properties) {
+  public RaftGRpcService(RaftServerImpl raftServer, RaftProperties properties) 
{
     int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
         RAFT_GRPC_SERVER_PORT_DEFAULT);
     int maxMessageSize = properties.getInt(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
index b171bed..a8a39bb 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
@@ -23,7 +23,7 @@ import org.apache.raft.grpc.RaftGrpcConfigKeys;
 import org.apache.raft.server.impl.FollowerInfo;
 import org.apache.raft.server.impl.LeaderState;
 import org.apache.raft.server.impl.LogAppender;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.shaded.io.grpc.Status;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto;
@@ -56,8 +56,8 @@ public class GRpcLogAppender extends LogAppender {
   private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
   private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
 
-  public GRpcLogAppender(RaftServer server, LeaderState leaderState,
-      FollowerInfo f) {
+  public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState,
+                         FollowerInfo f) {
     super(server, leaderState, f);
 
     RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
index 5f01980..cc2e513 100644
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
+++ 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
@@ -21,12 +21,12 @@ import org.apache.raft.server.impl.FollowerInfo;
 import org.apache.raft.server.impl.LeaderState;
 import org.apache.raft.server.impl.LogAppender;
 import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 
 public class PipelinedLogAppenderFactory implements LogAppenderFactory {
   @Override
-  public LogAppender getLogAppender(RaftServer server, LeaderState state,
-      FollowerInfo f) {
+  public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                                    FollowerInfo f) {
     return new GRpcLogAppender(server, state, f);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
index 420ee88..359dabd 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
@@ -28,7 +28,7 @@ import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.impl.BlockRequestHandlingInjection;
 import org.apache.raft.server.impl.DelayLocalExecutionInjection;
 import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.util.NetUtils;
 
 import java.io.IOException;
@@ -71,10 +71,10 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   private static Map<RaftPeer, RaftGRpcService> initRpcServices(
-      Collection<RaftServer> servers, RaftProperties prop) throws IOException {
+      Collection<RaftServerImpl> servers, RaftProperties prop) throws 
IOException {
     final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();
 
-    for (RaftServer s : servers) {
+    for (RaftServerImpl s : servers) {
       final RaftGRpcService rpc = new RaftGRpcService(s, prop);
       peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
     }
@@ -88,11 +88,11 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
 
   @Override
   protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-      Collection<RaftServer> newServers, boolean startService)
+                                             Collection<RaftServerImpl> 
newServers, boolean startService)
       throws IOException {
     final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, 
properties);
     for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) {
-      RaftServer server = servers.get(entry.getKey().getId());
+      RaftServerImpl server = servers.get(entry.getKey().getId());
       server.setServerRpc(entry.getValue());
       if (!startService) {
         
BlockRequestHandlingInjection.getInstance().blockReplier(server.getId());
@@ -104,8 +104,8 @@ public class MiniRaftClusterWithGRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  protected RaftServer setPeerRpc(RaftPeer peer) throws IOException {
-    RaftServer server = servers.get(peer.getId());
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
+    RaftServerImpl server = servers.get(peer.getId());
     int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort();
     int oldPort = 
properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY,
         RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
index 0c85854..82a4e13 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
@@ -24,7 +24,7 @@ import org.apache.raft.grpc.client.AppendStreamer;
 import org.apache.raft.grpc.client.RaftOutputStream;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.storage.RaftLog;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.util.RaftUtils;
@@ -95,7 +95,7 @@ public class TestRaftStream {
     cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
 
     cluster.start();
-    RaftServer leader = waitForLeader(cluster);
+    RaftServerImpl leader = waitForLeader(cluster);
 
     int count = 1;
     try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1",
@@ -137,7 +137,7 @@ public class TestRaftStream {
     cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
     cluster.start();
 
-    RaftServer leader = waitForLeader(cluster);
+    RaftServerImpl leader = waitForLeader(cluster);
     RaftOutputStream out = new RaftOutputStream(prop, "writer",
         cluster.getPeers(), leader.getId());
 
@@ -215,7 +215,7 @@ public class TestRaftStream {
 
     cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
     cluster.start();
-    RaftServer leader = waitForLeader(cluster);
+    RaftServerImpl leader = waitForLeader(cluster);
 
     RaftOutputStream out = new RaftOutputStream(prop, "writer",
         cluster.getPeers(), leader.getId());
@@ -273,7 +273,7 @@ public class TestRaftStream {
     prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
     cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
     cluster.start();
-    final RaftServer leader = waitForLeader(cluster);
+    final RaftServerImpl leader = waitForLeader(cluster);
 
     final AtomicBoolean running  = new AtomicBoolean(true);
     final AtomicBoolean success = new AtomicBoolean(false);
@@ -301,7 +301,7 @@ public class TestRaftStream {
 
     // force change the leader
     RaftTestUtil.waitAndKillLeader(cluster, true);
-    final RaftServer newLeader = waitForLeader(cluster);
+    final RaftServerImpl newLeader = waitForLeader(cluster);
     Assert.assertNotEquals(leader.getId(), newLeader.getId());
     Thread.sleep(500);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
index b2d104b..c6667b4 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
@@ -22,7 +22,7 @@ import org.apache.raft.RaftBasicTests;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.raft.server.impl.BlockRequestHandlingInjection;
 import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -34,7 +34,7 @@ import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPEND
 
 public class TestRaftWithGrpc extends RaftBasicTests {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
   }
 
   private final MiniRaftClusterWithGRpc cluster;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
index ad4beec..eb92372 100644
--- 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
+++ 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
@@ -26,7 +26,7 @@ import org.apache.raft.hadooprpc.client.RaftClientProtocolPB;
 import 
org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.*;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerRpc;
 import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.com.google.protobuf.BlockingService;
@@ -56,7 +56,7 @@ public class HadoopRpcService implements RaftServerRpc {
 
   private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies;
 
-  public HadoopRpcService(RaftServer server, final Configuration conf)
+  public HadoopRpcService(RaftServerImpl server, final Configuration conf)
       throws IOException {
     this.proxies = new PeerProxyMap<>(
         p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index 7f7ef49..87b4d1c 100644
--- 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -27,7 +27,7 @@ import 
org.apache.raft.hadooprpc.client.HadoopClientRequestSender;
 import org.apache.raft.hadooprpc.server.HadoopRpcService;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.impl.DelayLocalExecutionInjection;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,10 +70,10 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   private static Map<RaftPeer, HadoopRpcService> initRpcServices(
-      Collection<RaftServer> servers, Configuration hadoopConf) throws 
IOException {
+      Collection<RaftServerImpl> servers, Configuration hadoopConf) throws 
IOException {
     final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>();
 
-    for(RaftServer s : servers) {
+    for(RaftServerImpl s : servers) {
       final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf);
       peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
     }
@@ -81,11 +81,11 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  protected RaftServer setPeerRpc(RaftPeer peer) throws IOException {
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
     Configuration hconf = new Configuration(hadoopConf);
     hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress());
 
-    RaftServer server = servers.get(peer.getId());
+    RaftServerImpl server = servers.get(peer.getId());
     final HadoopRpcService rpc = new HadoopRpcService(server, hconf);
     Preconditions.checkState(
         rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
@@ -97,7 +97,7 @@ public class MiniRaftClusterWithHadoopRpc extends 
MiniRaftCluster.RpcBase {
 
   @Override
   public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-      Collection<RaftServer> newServers, boolean startService)
+                                          Collection<RaftServerImpl> 
newServers, boolean startService)
       throws IOException {
     return addNewPeers(initRpcServices(newServers, hadoopConf),
         newServers, startService);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
index 3971274..4848196 100644
--- 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
+++ 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
@@ -22,7 +22,7 @@ import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.server.impl.BlockRequestHandlingInjection;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Test;
@@ -33,7 +33,7 @@ import static 
org.apache.raft.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServerR
 
 public class TestRaftWithHadoopRpc extends RaftBasicTests {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java 
b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
index c0d751d..b5b8550 100644
--- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
+++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
@@ -29,7 +29,7 @@ import org.apache.raft.client.impl.ClientProtoUtils;
 import org.apache.raft.netty.NettyRpcProxy;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.server.impl.RaftServerRpc;
 import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
@@ -75,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc {
   }
 
   /** Constructs a netty server with the given port. */
-  public NettyRpcService(int port, RaftServer server) {
+  public NettyRpcService(int port, RaftServerImpl server) {
     this.raftService = new RequestDispatcher(server);
     this.id = server.getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java 
b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
index 4958e9a..12cdf13 100644
--- 
a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
+++ 
b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
@@ -26,7 +26,7 @@ import org.apache.raft.netty.server.NettyRpcService;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.impl.DelayLocalExecutionInjection;
 import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.util.NetUtils;
 
 import java.io.IOException;
@@ -69,17 +69,17 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   }
 
   private static NettyRpcService newNettyRpcService(
-      RaftServer s, RaftConfiguration conf) {
+      RaftServerImpl s, RaftConfiguration conf) {
     final String address = getAddress(s.getId(), conf);
     final int port = NetUtils.newInetSocketAddress(address).getPort();
     return new NettyRpcService(port, s);
   }
 
   private static Map<RaftPeer, NettyRpcService> initRpcServices(
-      Collection<RaftServer> servers, RaftConfiguration conf) {
+      Collection<RaftServerImpl> servers, RaftConfiguration conf) {
     final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>();
 
-    for (RaftServer s : servers) {
+    for (RaftServerImpl s : servers) {
       final NettyRpcService rpc = newNettyRpcService(s, conf);
       peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
     }
@@ -88,8 +88,8 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
   }
 
   @Override
-  protected RaftServer setPeerRpc(RaftPeer peer) throws IOException {
-    final RaftServer s = servers.get(peer.getId());
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
+    final RaftServerImpl s = servers.get(peer.getId());
     final NettyRpcService rpc = newNettyRpcService(s, conf);
     s.setServerRpc(rpc);
     return s;
@@ -97,7 +97,7 @@ public class MiniRaftClusterWithNetty extends 
MiniRaftCluster.RpcBase {
 
   @Override
   protected Collection<RaftPeer> addNewPeers(
-      Collection<RaftPeer> newPeers, Collection<RaftServer> newServers,
+      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
       boolean startService) throws IOException {
     return addNewPeers(initRpcServices(newServers, conf),
         newServers, startService);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java 
b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
index fb75b7b..cba991d 100644
--- a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
+++ b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
@@ -21,7 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.server.impl.BlockRequestHandlingInjection;
-import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerImpl;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Test;
 
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 public class TestRaftWithNetty extends RaftBasicTests {
   static {
-    RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java
index 61b3c92..20f2d8f 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java
@@ -25,15 +25,15 @@ import org.slf4j.Logger;
  * Used when the peer is a follower. Used to track the election timeout.
  */
 class FollowerState extends Daemon {
-  static final Logger LOG = RaftServer.LOG;
+  static final Logger LOG = RaftServerImpl.LOG;
 
-  private final RaftServer server;
+  private final RaftServerImpl server;
 
   private volatile Timestamp lastRpcTime = new Timestamp();
   private volatile boolean monitorRunning = true;
   private volatile boolean inLogSync = false;
 
-  FollowerState(RaftServer server) {
+  FollowerState(RaftServerImpl server) {
     this.server = server;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
index 10e901f..8552029 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java
@@ -67,7 +67,7 @@ class LeaderElection extends Daemon {
     }
   }
 
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private ExecutorCompletionService<RequestVoteReplyProto> service;
   private ExecutorService executor;
   private volatile boolean running;
@@ -79,7 +79,7 @@ class LeaderElection extends Daemon {
   private final RaftConfiguration conf;
   private final Collection<RaftPeer> others;
 
-  LeaderElection(RaftServer server) {
+  LeaderElection(RaftServerImpl server) {
     this.server = server;
     conf = server.getRaftConf();
     others = conf.getOtherPeers(server.getId());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
index 79d0d9e..39dc400 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java
@@ -49,7 +49,7 @@ import static 
org.apache.raft.server.impl.LeaderState.StateUpdateEventType.*;
  *                           corresponding log entries are committed
  */
 public class LeaderState {
-  private static final Logger LOG = RaftServer.LOG;
+  private static final Logger LOG = RaftServerImpl.LOG;
 
   enum StateUpdateEventType {
     STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
@@ -74,7 +74,7 @@ public class LeaderState {
   static final StateUpdateEvent STAGING_PROGRESS_EVENT =
       new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
 
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private final RaftLog raftLog;
   private final long currentTerm;
   private volatile ConfigurationStagingState stagingState;
@@ -94,7 +94,7 @@ public class LeaderState {
   private final int snapshotChunkMaxSize;
   private final int syncInterval;
 
-  LeaderState(RaftServer server, RaftProperties properties) {
+  LeaderState(RaftServerImpl server, RaftProperties properties) {
     this.server = server;
 
     stagingCatchupGap = properties.getInt(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
index 143da28..3b18f13 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java
@@ -46,9 +46,9 @@ import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
  * A daemon thread appending log entries to a follower peer.
  */
 public class LogAppender extends Daemon {
-  public static final Logger LOG = RaftServer.LOG;
+  public static final Logger LOG = RaftServerImpl.LOG;
 
-  protected final RaftServer server;
+  protected final RaftServerImpl server;
   private final LeaderState leaderState;
   protected final RaftLog raftLog;
   protected final FollowerInfo follower;
@@ -59,7 +59,7 @@ public class LogAppender extends Daemon {
 
   private volatile boolean sending = true;
 
-  public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo 
f) {
+  public LogAppender(RaftServerImpl server, LeaderState leaderState, 
FollowerInfo f) {
     this.follower = f;
     this.server = server;
     this.leaderState = leaderState;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
index b5ed775..d77faff 100644
--- 
a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
+++ 
b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java
@@ -18,13 +18,13 @@
 package org.apache.raft.server.impl;
 
 public interface LogAppenderFactory {
-  LogAppender getLogAppender(RaftServer server, LeaderState state,
-      FollowerInfo f);
+  LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                             FollowerInfo f);
 
   class SynchronousLogAppenderFactory implements LogAppenderFactory {
     @Override
-    public LogAppender getLogAppender(RaftServer server, LeaderState state,
-        FollowerInfo f) {
+    public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
+                                      FollowerInfo f) {
       return new LogAppender(server, state, f);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
index ad6ecef..32f127e 100644
--- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
+++ b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java
@@ -30,14 +30,14 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 class PendingRequests {
-  private static final Logger LOG = RaftServer.LOG;
+  private static final Logger LOG = RaftServerImpl.LOG;
 
   private PendingRequest pendingSetConf;
-  private final RaftServer server;
+  private final RaftServerImpl server;
   private final ConcurrentMap<Long, PendingRequest> pendingRequests = new 
ConcurrentHashMap<>();
   private PendingRequest last = null;
 
-  PendingRequests(RaftServer server) {
+  PendingRequests(RaftServerImpl server) {
     this.server = server;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java 
b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
deleted file mode 100644
index c1bf4a9..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.concurrent.CompletableFuture;
-
-import static 
org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import static org.apache.raft.util.LifeCycle.State.*;
-
-public class RaftServer implements RaftServerProtocol, Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class);
-
-  private static final String CLASS_NAME = RaftServer.class.getSimpleName();
-  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
-  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
-  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
-
-
-  private final int minTimeoutMs;
-  private final int maxTimeoutMs;
-
-  private final LifeCycle lifeCycle;
-  private final ServerState state;
-  private final StateMachine stateMachine;
-  private final RaftProperties properties;
-  private volatile Role role;
-
-  /** used when the peer is follower, to monitor election timeout */
-  private volatile FollowerState heartbeatMonitor;
-
-  /** used when the peer is candidate, to request votes from other peers */
-  private volatile LeaderElection electionDaemon;
-
-  /** used when the peer is leader */
-  private volatile LeaderState leaderState;
-
-  private RaftServerRpc serverRpc;
-
-  private final LogAppenderFactory appenderFactory;
-
-  public RaftServer(String id, RaftConfiguration raftConf,
-      RaftProperties properties, StateMachine stateMachine) throws IOException 
{
-    this.lifeCycle = new LifeCycle(id);
-    minTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
-    maxTimeoutMs = properties.getInt(
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
-    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
-        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
-    this.properties = properties;
-    this.stateMachine = stateMachine;
-    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
-    appenderFactory = initAppenderFactory();
-  }
-
-  public int getMinTimeoutMs() {
-    return minTimeoutMs;
-  }
-
-  public int getMaxTimeoutMs() {
-    return maxTimeoutMs;
-  }
-
-  public int getRandomTimeoutMs() {
-    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
-  }
-
-  public StateMachine getStateMachine() {
-    return this.stateMachine;
-  }
-
-  public LogAppenderFactory getLogAppenderFactory() {
-    return appenderFactory;
-  }
-
-  private LogAppenderFactory initAppenderFactory() {
-    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
-        LogAppenderFactory.class);
-    return RaftUtils.newInstance(factoryClass);
-  }
-
-  /**
-   * Used by tests to set initial raft configuration with correct port 
bindings.
-   */
-  @VisibleForTesting
-  public void setInitialConf(RaftConfiguration conf) {
-    this.state.setInitialConf(conf);
-  }
-
-  public void setServerRpc(RaftServerRpc serverRpc) {
-    this.serverRpc = serverRpc;
-    // add peers into rpc service
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null) {
-      addPeersToRPC(conf.getPeers());
-    }
-  }
-
-  public RaftServerRpc getServerRpc() {
-    return serverRpc;
-  }
-
-  public void start() {
-    lifeCycle.transition(STARTING);
-    state.start();
-    RaftConfiguration conf = getRaftConf();
-    if (conf != null && conf.contains(getId())) {
-      LOG.debug("{} starts as a follower", getId());
-      startAsFollower();
-    } else {
-      LOG.debug("{} starts with initializing state", getId());
-      startInitializing();
-    }
-  }
-
-  /**
-   * The peer belongs to the current configuration, should start as a follower
-   */
-  private void startAsFollower() {
-    role = Role.FOLLOWER;
-    heartbeatMonitor = new FollowerState(this);
-    heartbeatMonitor.start();
-
-    serverRpc.start();
-    lifeCycle.transition(RUNNING);
-  }
-
-  /**
-   * The peer does not have any configuration (maybe it will later be included
-   * in some configuration). Start still as a follower but will not vote or
-   * start election.
-   */
-  private void startInitializing() {
-    role = Role.FOLLOWER;
-    // do not start heartbeatMonitoring
-    serverRpc.start();
-  }
-
-  public ServerState getState() {
-    return this.state;
-  }
-
-  public String getId() {
-    return getState().getSelfId();
-  }
-
-  public RaftConfiguration getRaftConf() {
-    return getState().getRaftConf();
-  }
-
-  @Override
-  public void close() {
-    lifeCycle.checkStateAndClose(() -> {
-      try {
-        shutdownHeartbeatMonitor();
-        shutdownElectionDaemon();
-        shutdownLeaderState();
-
-        serverRpc.shutdown();
-        state.close();
-      } catch (Exception ignored) {
-        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
-      }
-    });
-  }
-
-  public boolean isAlive() {
-    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
-  }
-
-  public boolean isFollower() {
-    return role == Role.FOLLOWER;
-  }
-
-  public boolean isCandidate() {
-    return role == Role.CANDIDATE;
-  }
-
-  public boolean isLeader() {
-    return role == Role.LEADER;
-  }
-
-  Role getRole() {
-    return role;
-  }
-
-  /**
-   * Change the server state to Follower if necessary
-   * @param newTerm The new term.
-   * @param sync We will call {@link ServerState#persistMetadata()} if this is
-   *             set to true and term/votedFor get updated.
-   * @return if the term/votedFor should be updated to the new term
-   * @throws IOException if term/votedFor persistence failed.
-   */
-  synchronized boolean changeToFollower(long newTerm, boolean sync)
-      throws IOException {
-    final Role old = role;
-    role = Role.FOLLOWER;
-
-    boolean metadataUpdated = false;
-    if (newTerm > state.getCurrentTerm()) {
-      state.setCurrentTerm(newTerm);
-      state.resetLeaderAndVotedFor();
-      metadataUpdated = true;
-    }
-
-    if (old == Role.LEADER) {
-      assert leaderState != null;
-      shutdownLeaderState();
-    } else if (old == Role.CANDIDATE) {
-      shutdownElectionDaemon();
-    }
-
-    if (old != Role.FOLLOWER) {
-      heartbeatMonitor = new FollowerState(this);
-      heartbeatMonitor.start();
-    }
-
-    if (metadataUpdated && sync) {
-      state.persistMetadata();
-    }
-    return metadataUpdated;
-  }
-
-  private synchronized void shutdownLeaderState() {
-    final LeaderState leader = leaderState;
-    if (leader != null) {
-      leader.stop();
-    }
-    leaderState = null;
-    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
-  }
-
-  private void shutdownElectionDaemon() {
-    final LeaderElection election = electionDaemon;
-    if (election != null) {
-      election.stopRunning();
-      // no need to interrupt the election thread
-    }
-    electionDaemon = null;
-  }
-
-  synchronized void changeToLeader() {
-    Preconditions.checkState(isCandidate());
-    shutdownElectionDaemon();
-    role = Role.LEADER;
-    state.becomeLeader();
-    // start sending AppendEntries RPC to followers
-    leaderState = new LeaderState(this, properties);
-    leaderState.start();
-  }
-
-  private void shutdownHeartbeatMonitor() {
-    final FollowerState hm = heartbeatMonitor;
-    if (hm != null) {
-      hm.stopRunning();
-      hm.interrupt();
-    }
-    heartbeatMonitor = null;
-  }
-
-  synchronized void changeToCandidate() {
-    Preconditions.checkState(isFollower());
-    shutdownHeartbeatMonitor();
-    role = Role.CANDIDATE;
-    // start election
-    electionDaemon = new LeaderElection(this);
-    electionDaemon.start();
-  }
-
-  @Override
-  public String toString() {
-    return role + " " + state + " " + lifeCycle.getCurrentState();
-  }
-
-  /**
-   * @return null if the server is in leader state.
-   */
-  CompletableFuture<RaftClientReply> checkLeaderState(
-      RaftClientRequest request) {
-    if (!isLeader()) {
-      NotLeaderException exception = generateNotLeaderException();
-      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
-      future.complete(new RaftClientReply(request, exception));
-      return future;
-    }
-    return null;
-  }
-
-  NotLeaderException generateNotLeaderException() {
-    if (lifeCycle.getCurrentState() != RUNNING) {
-      return new NotLeaderException(getId(), null, null);
-    }
-    String leaderId = state.getLeaderId();
-    if (leaderId == null || leaderId.equals(state.getSelfId())) {
-      // No idea about who is the current leader. Or the peer is the current
-      // leader, but it is about to step down
-      RaftPeer suggestedLeader = state.getRaftConf()
-          .getRandomPeer(state.getSelfId());
-      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
-    }
-    RaftConfiguration conf = getRaftConf();
-    Collection<RaftPeer> peers = conf.getPeers();
-    return new NotLeaderException(getId(), conf.getPeer(leaderId),
-        peers.toArray(new RaftPeer[peers.size()]));
-  }
-
-  /**
-   * Handle a normal update request from client.
-   */
-  public CompletableFuture<RaftClientReply> appendTransaction(
-      RaftClientRequest request, TransactionContext entry)
-      throws RaftException {
-    LOG.debug("{}: receive client request({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply;
-
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      // append the message to its local log
-      final long entryIndex;
-      try {
-        entryIndex = state.applyLog(entry);
-      } catch (IOException e) {
-        throw new RaftException(e);
-      }
-
-      // put the request into the pending queue
-      pending = leaderState.addPendingRequest(entryIndex, request, entry);
-      leaderState.notifySenders();
-    }
-    return pending.getFuture();
-  }
-
-  /**
-   * Handle a raft configuration change request from client.
-   */
-  public CompletableFuture<RaftClientReply> setConfiguration(
-      SetConfigurationRequest request) throws IOException {
-    LOG.debug("{}: receive setConfiguration({})", getId(), request);
-    lifeCycle.assertCurrentState(RUNNING);
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
-    if (reply != null) {
-      return reply;
-    }
-
-    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
-    final PendingRequest pending;
-    synchronized (this) {
-      reply = checkLeaderState(request);
-      if (reply != null) {
-        return reply;
-      }
-
-      final RaftConfiguration current = getRaftConf();
-      // make sure there is no other raft reconfiguration in progress
-      if (!current.isStable() || leaderState.inStagingState() ||
-          !state.isCurrentConfCommitted()) {
-        throw new ReconfigurationInProgressException(
-            "Reconfiguration is already in progress: " + current);
-      }
-
-      // return true if the new configuration is the same with the current one
-      if (current.hasNoChange(peersInNewConf)) {
-        pending = leaderState.returnNoConfChange(request);
-        return pending.getFuture();
-      }
-
-      // add new peers into the rpc service
-      addPeersToRPC(Arrays.asList(peersInNewConf));
-      // add staging state into the leaderState
-      pending = leaderState.startSetConfiguration(request);
-    }
-    return pending.getFuture();
-  }
-
-  private boolean shouldWithholdVotes() {
-    return isLeader() || (isFollower() && state.hasLeader()
-        && heartbeatMonitor.shouldWithholdVotes());
-  }
-
-  /**
-   * check if the remote peer is not included in the current conf
-   * and should shutdown. should shutdown if all the following stands:
-   * 1. this is a leader
-   * 2. current conf is stable and has been committed
-   * 3. candidate id is not included in conf
-   * 4. candidate's last entry's index < conf's index
-   */
-  private boolean shouldSendShutdown(String candidateId,
-      TermIndex candidateLastEntry) {
-    return isLeader()
-        && getRaftConf().isStable()
-        && getState().isConfCommitted()
-        && !getRaftConf().containsInConf(candidateId)
-        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
-        && !leaderState.isBootStrappingPeer(candidateId);
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
-      throws IOException {
-    final String candidateId = r.getServerRequest().getRequestorId();
-    return requestVote(candidateId, r.getCandidateTerm(),
-        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
-  }
-
-  private RequestVoteReplyProto requestVote(String candidateId,
-      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
-    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
-        candidateId, candidateTerm, candidateLastEntry);
-    LOG.debug("{}: receive requestVote({}, {}, {})",
-        getId(), candidateId, candidateTerm, candidateLastEntry);
-    lifeCycle.assertCurrentState(RUNNING);
-
-    boolean voteGranted = false;
-    boolean shouldShutdown = false;
-    final RequestVoteReplyProto reply;
-    synchronized (this) {
-      if (shouldWithholdVotes()) {
-        LOG.info("{} Withhold vote from server {} with term {}. " +
-            "This server:{}, last rpc time from leader {} is {}", getId(),
-            candidateId, candidateTerm, this, this.getState().getLeaderId(),
-            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
-      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        boolean termUpdated = changeToFollower(candidateTerm, false);
-        // see Section 5.4.1 Election restriction
-        if (state.isLogUpToDate(candidateLastEntry)) {
-          heartbeatMonitor.updateLastRpcTime(false);
-          state.grantVote(candidateId);
-          voteGranted = true;
-        }
-        if (termUpdated || voteGranted) {
-          state.persistMetadata(); // sync metafile
-        }
-      }
-      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) 
{
-        shouldShutdown = true;
-      }
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
-            getId(), ProtoUtils.toString(reply), state);
-      }
-    }
-    return reply;
-  }
-
-  private void validateEntries(long expectedTerm, TermIndex previous,
-      LogEntryProto... entries) {
-    if (entries != null && entries.length > 0) {
-      final long index0 = entries[0].getIndex();
-
-      if (previous == null || previous.getTerm() == 0) {
-        Preconditions.checkArgument(index0 == 0,
-            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
-            0, index0);
-      } else {
-        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
-            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
-            previous, 0, index0);
-      }
-
-      for (int i = 0; i < entries.length; i++) {
-        final long t = entries[i].getTerm();
-        Preconditions.checkArgument(expectedTerm >= t,
-            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
-            i, t, expectedTerm);
-
-        final long indexi = entries[i].getIndex();
-        Preconditions.checkArgument(indexi == index0 + i,
-            "Unexpected Index: entries[%s].getIndex()=%s but 
entries[0].getIndex()=%s",
-            i, indexi, index0);
-      }
-    }
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
-      throws IOException {
-    // TODO avoid converting list to array
-    final LogEntryProto[] entries = r.getEntriesList()
-        .toArray(new LogEntryProto[r.getEntriesCount()]);
-    final TermIndex previous = r.hasPreviousLog() ?
-        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
-    return appendEntries(r.getServerRequest().getRequestorId(),
-        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
-        entries);
-  }
-
-  private AppendEntriesReplyProto appendEntries(String leaderId, long 
leaderTerm,
-      TermIndex previous, long leaderCommit, boolean initializing,
-      LogEntryProto... entries) throws IOException {
-    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
-        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
-          leaderId, leaderTerm, previous, leaderCommit, initializing,
-          ServerProtoUtils.toString(entries));
-    }
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    try {
-      validateEntries(leaderTerm, previous, entries);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    }
-
-    final long currentTerm;
-    long nextIndex = state.getLog().getNextIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: do not recognize leader. Reply: {}",
-              getId(), ProtoUtils.toString(reply));
-        }
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
-        heartbeatMonitor = new FollowerState(this);
-        heartbeatMonitor.start();
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // We need to check if "previous" is in the local peer. Note that it is
-      // possible that "previous" is covered by the latest snapshot: e.g.,
-      // it's possible there's no log entries outside of the latest snapshot.
-      // However, it is not possible that "previous" index is smaller than the
-      // last index included in snapshot. This is because indices <= snapshot's
-      // last index should have been committed.
-      if (previous != null && !containPrevious(previous)) {
-        final AppendEntriesReplyProto reply =
-            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
-                currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY);
-        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
-            getId(), previous, ServerProtoUtils.toString(reply));
-        return reply;
-      }
-
-      state.getLog().append(entries);
-      state.updateConfiguration(entries);
-      state.updateStatemachine(leaderCommit, currentTerm);
-    }
-    if (entries != null && entries.length > 0) {
-      try {
-        state.getLog().logSync();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException("logSync got interrupted");
-      }
-      nextIndex = entries[entries.length - 1].getIndex() + 1;
-    }
-    synchronized (this) {
-      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
-          && getState().getCurrentTerm() == currentTerm) {
-        // reset election timer to avoid punishing the leader for our own
-        // long disk writes
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
-    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
-        ServerProtoUtils.toString(reply));
-    return reply;
-  }
-
-  private boolean containPrevious(TermIndex previous) {
-    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
-        getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
-    return state.getLog().contains(previous)
-        ||  (state.getLatestSnapshot() != null
-             && state.getLatestSnapshot().getTermIndex().equals(previous))
-        || (state.getLatestInstalledSnapshot() != null)
-             && state.getLatestInstalledSnapshot().equals(previous);
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    final String leaderId = request.getServerRequest().getRequestorId();
-    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, 
request);
-    LOG.debug("{}: receive installSnapshot({})", getId(), request);
-
-    lifeCycle.assertCurrentState(STARTING, RUNNING);
-
-    final long currentTerm;
-    final long leaderTerm = request.getLeaderTerm();
-    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
-    final long lastIncludedIndex = lastTermIndex.getIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final InstallSnapshotReplyProto reply = ServerProtoUtils
-            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
-                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
-        LOG.debug("{}: do not recognize leader for installing snapshot." +
-            " Reply: {}", getId(), reply);
-        return reply;
-      }
-      changeToFollower(leaderTerm, true);
-      state.setLeader(leaderId);
-
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(true);
-      }
-
-      // Check and append the snapshot chunk. We simply put this in lock
-      // considering a follower peer requiring a snapshot installation does not
-      // have a lot of requests
-      Preconditions.checkState(
-          state.getLog().getNextIndex() <= lastIncludedIndex,
-          "%s log's next id is %s, last included index in snapshot is %s",
-          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
-
-      //TODO: We should only update State with installed snapshot once the 
request is done.
-      state.installSnapshot(request);
-
-      // update the committed index
-      // re-load the state machine if this is the last chunk
-      if (request.getDone()) {
-        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
-      }
-      if (lifeCycle.getCurrentState() == RUNNING) {
-        heartbeatMonitor.updateLastRpcTime(false);
-      }
-    }
-    if (request.getDone()) {
-      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
-          lastIncludedIndex);
-    }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
-        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
-  }
-
-  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
-      String targetId, TermIndex previous, List<LogEntryProto> entries,
-      boolean initializing) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
-        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
-        initializing, previous);
-  }
-
-  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
-      String targetId, String requestId, int requestIndex, SnapshotInfo 
snapshot,
-      List<FileChunkProto> chunks, boolean done) {
-    OptionalLong totalSize = snapshot.getFiles().stream()
-        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
-    assert totalSize.isPresent();
-    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
-        requestId, requestIndex, state.getCurrentTerm(), 
snapshot.getTermIndex(),
-        chunks, totalSize.getAsLong(), done);
-  }
-
-  synchronized RequestVoteRequestProto createRequestVoteRequest(String 
targetId,
-      long term, TermIndex lastEntry) {
-    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
-        lastEntry);
-  }
-
-  public synchronized void submitLocalSyncEvent() {
-    if (isLeader() && leaderState != null) {
-      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
-    }
-  }
-
-  public void addPeersToRPC(Iterable<RaftPeer> peers) {
-    serverRpc.addPeers(peers);
-  }
-
-  synchronized void replyPendingRequest(long logIndex,
-      CompletableFuture<Message> message) {
-    if (isLeader() && leaderState != null) { // is leader and is running
-      leaderState.replyPendingRequest(logIndex, message);
-    }
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    if (leaderState != null) { // is leader and is running
-      return leaderState.getTransactionContext(index);
-    }
-    return null;
-  }
-
-  public RaftProperties getProperties() {
-    return this.properties;
-  }
-}

Reply via email to