Move o.a.r.s.* to o.a.r.s.impl.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c36810ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c36810ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c36810ed

Branch: refs/heads/master
Commit: c36810ed8dbd0e202f98ba57bd5730e7edff6863
Parents: dd0b631
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Sat Dec 31 17:23:13 2016 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Sat Dec 31 17:23:13 2016 +0800

----------------------------------------------------------------------
 .../arithmetic/ArithmeticStateMachine.java      |   2 +-
 .../java/org/apache/raft/TestBatchAppend.java   |   2 +-
 .../org/apache/raft/TestRestartRaftPeer.java    |   2 +-
 .../TestRaftStateMachineException.java          |   2 +-
 .../org/apache/raft/grpc/RaftGRpcService.java   |   6 +-
 .../grpc/client/RaftClientProtocolService.java  |   2 +-
 .../raft/grpc/server/GRpcLogAppender.java       |   8 +-
 .../server/PipelinedLogAppenderFactory.java     |  10 +-
 .../grpc/server/RaftServerProtocolService.java  |   2 +-
 .../raft/grpc/MiniRaftClusterWithGRpc.java      |   8 +-
 .../grpc/TestNotLeaderExceptionWithGrpc.java    |   2 +-
 .../grpc/TestRaftReconfigurationWithGRpc.java   |   4 +-
 .../org/apache/raft/grpc/TestRaftStream.java    |   4 +-
 .../org/apache/raft/grpc/TestRaftWithGrpc.java  |   6 +-
 .../raft/hadooprpc/server/HadoopRpcService.java |   3 +
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |   4 +-
 .../TestRaftReconfigurationWithHadoopRpc.java   |   2 +-
 .../raft/hadooprpc/TestRaftWithHadoopRpc.java   |   4 +-
 .../raft/netty/server/NettyRpcService.java      |   6 +-
 .../raft/netty/MiniRaftClusterWithNetty.java    |   7 +-
 .../netty/TestRaftReconfigurationWithNetty.java |   2 +-
 .../apache/raft/netty/TestRaftWithNetty.java    |   4 +-
 .../raft/server/ConfigurationManager.java       |  91 ---
 .../org/apache/raft/server/FollowerInfo.java    | 103 ---
 .../org/apache/raft/server/FollowerState.java   |  91 ---
 .../org/apache/raft/server/LeaderElection.java  | 242 ------
 .../org/apache/raft/server/LeaderState.java     | 581 --------------
 .../org/apache/raft/server/LogAppender.java     | 481 ------------
 .../apache/raft/server/LogAppenderFactory.java  |  31 -
 .../apache/raft/server/PeerConfiguration.java   |  90 ---
 .../org/apache/raft/server/PendingRequest.java  |  87 ---
 .../org/apache/raft/server/PendingRequests.java | 134 ----
 .../apache/raft/server/RaftConfiguration.java   | 261 -------
 .../java/org/apache/raft/server/RaftServer.java | 750 -------------------
 .../raft/server/RaftServerConfigKeys.java       |   1 +
 .../apache/raft/server/RaftServerConstants.java |  46 --
 .../org/apache/raft/server/RaftServerRpc.java   |  44 --
 .../apache/raft/server/RequestDispatcher.java   | 137 ----
 .../main/java/org/apache/raft/server/Role.java  |  25 -
 .../org/apache/raft/server/ServerState.java     | 346 ---------
 .../apache/raft/server/StateMachineUpdater.java | 213 ------
 .../raft/server/impl/ConfigurationManager.java  |  91 +++
 .../apache/raft/server/impl/FollowerInfo.java   | 103 +++
 .../apache/raft/server/impl/FollowerState.java  |  91 +++
 .../apache/raft/server/impl/LeaderElection.java | 241 ++++++
 .../apache/raft/server/impl/LeaderState.java    | 581 ++++++++++++++
 .../apache/raft/server/impl/LogAppender.java    | 480 ++++++++++++
 .../raft/server/impl/LogAppenderFactory.java    |  31 +
 .../raft/server/impl/PeerConfiguration.java     |  90 +++
 .../apache/raft/server/impl/PendingRequest.java |  87 +++
 .../raft/server/impl/PendingRequests.java       | 129 ++++
 .../raft/server/impl/RaftConfiguration.java     | 261 +++++++
 .../org/apache/raft/server/impl/RaftServer.java | 749 ++++++++++++++++++
 .../raft/server/impl/RaftServerConstants.java   |  46 ++
 .../apache/raft/server/impl/RaftServerRpc.java  |  44 ++
 .../raft/server/impl/RequestDispatcher.java     | 137 ++++
 .../java/org/apache/raft/server/impl/Role.java  |  25 +
 .../raft/server/impl/ServerProtoUtils.java      |   3 +-
 .../apache/raft/server/impl/ServerState.java    | 345 +++++++++
 .../raft/server/impl/StateMachineUpdater.java   | 213 ++++++
 .../raft/server/storage/LogInputStream.java     |   2 +-
 .../raft/server/storage/LogOutputStream.java    |   2 +-
 .../apache/raft/server/storage/LogReader.java   |   2 +-
 .../apache/raft/server/storage/LogSegment.java  |   2 +-
 .../raft/server/storage/MemoryRaftLog.java      |   4 +-
 .../org/apache/raft/server/storage/RaftLog.java |   6 +-
 .../raft/server/storage/RaftLogCache.java       |   4 +-
 .../raft/server/storage/RaftLogWorker.java      |   4 +-
 .../apache/raft/server/storage/RaftStorage.java |   2 +-
 .../server/storage/RaftStorageDirectory.java    |   2 +-
 .../raft/server/storage/SegmentedRaftLog.java   |   6 +-
 .../raft/statemachine/BaseStateMachine.java     |   4 +-
 .../statemachine/SimpleStateMachineStorage.java |   2 +-
 .../apache/raft/statemachine/SnapshotInfo.java  |   2 +-
 .../raft/statemachine/SnapshotInfoImpl.java     |   2 +-
 .../apache/raft/statemachine/StateMachine.java  |  12 +-
 .../raft/statemachine/TermIndexTracker.java     |   2 +-
 .../java/org/apache/raft/MiniRaftCluster.java   |   8 +-
 .../java/org/apache/raft/RaftBasicTests.java    |   8 +-
 .../raft/RaftNotLeaderExceptionBaseTest.java    |  10 +-
 .../test/java/org/apache/raft/RaftTestUtil.java |   6 +-
 .../server/BlockRequestHandlingInjection.java   |  84 ---
 .../server/DelayLocalExecutionInjection.java    |  67 --
 .../server/RaftReconfigurationBaseTest.java     | 576 --------------
 .../apache/raft/server/RaftServerTestUtil.java  |  67 --
 .../impl/BlockRequestHandlingInjection.java     |  84 +++
 .../impl/DelayLocalExecutionInjection.java      |  67 ++
 .../impl/RaftReconfigurationBaseTest.java       | 577 ++++++++++++++
 .../raft/server/impl/RaftServerTestUtil.java    |  67 ++
 .../MiniRaftClusterWithSimulatedRpc.java        |   2 +-
 .../server/simulation/SimulatedServerRpc.java   |   6 +-
 ...TestRaftReconfigurationWithSimulatedRpc.java |   2 +-
 .../simulation/TestRaftWithSimulatedRpc.java    |   2 +-
 .../server/storage/TestRaftLogReadWrite.java    |   4 +-
 .../raft/server/storage/TestRaftLogSegment.java |   8 +-
 .../raft/server/storage/TestRaftStorage.java    |   2 +-
 .../server/storage/TestSegmentedRaftLog.java    |   4 +-
 .../raft/statemachine/RaftSnapshotBaseTest.java |   6 +-
 .../SimpleStateMachine4Testing.java             |   2 +-
 .../raft/statemachine/TestStateMachine.java     |   2 +-
 100 files changed, 4651 insertions(+), 4663 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
 
b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
index 4f038d2..b684669 100644
--- 
a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
+++ 
b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
@@ -22,7 +22,7 @@ import 
org.apache.raft.examples.arithmetic.expression.Expression;
 import org.apache.raft.protocol.Message;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.server.RaftServerConstants;
+import org.apache.raft.server.impl.RaftServerConstants;
 import org.apache.raft.server.protocol.TermIndex;
 import org.apache.raft.server.storage.RaftStorage;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
----------------------------------------------------------------------
diff --git a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java 
b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
index 396bd47..c730245 100644
--- a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
+++ b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java
@@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.statemachine.SimpleStateMachine4Testing;
 import org.apache.raft.statemachine.StateMachine;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java 
b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
index 10741f9..fadd7a8 100644
--- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
+++ b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java
@@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage;
 import org.apache.raft.client.RaftClient;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.examples.RaftExamplesTestUtil;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
 
b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
index 61d7989..2fa6e53 100644
--- 
a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
+++ 
b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java
@@ -24,7 +24,7 @@ import org.apache.raft.client.RaftClient;
 import org.apache.raft.examples.RaftExamplesTestUtil;
 import org.apache.raft.protocol.Message;
 import org.apache.raft.protocol.StateMachineException;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.simulation.RequestHandler;
 import org.apache.raft.server.storage.RaftLog;
 import org.apache.raft.util.RaftUtils;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
index 8259083..c9a0daf 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java
@@ -23,9 +23,9 @@ import org.apache.raft.grpc.client.RaftClientProtocolService;
 import org.apache.raft.grpc.server.RaftServerProtocolClient;
 import org.apache.raft.grpc.server.RaftServerProtocolService;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.RequestDispatcher;
+import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerRpc;
+import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.Server;
 import org.apache.raft.shaded.io.grpc.ServerBuilder;
 import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
index d8704f6..32dbac7 100644
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
+++ 
b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.raft.client.impl.ClientProtoUtils;
 import org.apache.raft.grpc.RaftGrpcUtil;
 import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.server.RequestDispatcher;
+import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
index 2f1fc90..b171bed 100644
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
+++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
@@ -20,10 +20,10 @@ package org.apache.raft.grpc.server;
 import com.google.common.base.Preconditions;
 import org.apache.raft.grpc.RaftGRpcService;
 import org.apache.raft.grpc.RaftGrpcConfigKeys;
-import org.apache.raft.server.FollowerInfo;
-import org.apache.raft.server.LeaderState;
-import org.apache.raft.server.LogAppender;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.FollowerInfo;
+import org.apache.raft.server.impl.LeaderState;
+import org.apache.raft.server.impl.LogAppender;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.shaded.io.grpc.Status;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
index 90882df..5f01980 100644
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
+++ 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
@@ -17,11 +17,11 @@
  */
 package org.apache.raft.grpc.server;
 
-import org.apache.raft.server.FollowerInfo;
-import org.apache.raft.server.LeaderState;
-import org.apache.raft.server.LogAppender;
-import org.apache.raft.server.LogAppenderFactory;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.FollowerInfo;
+import org.apache.raft.server.impl.LeaderState;
+import org.apache.raft.server.impl.LogAppender;
+import org.apache.raft.server.impl.LogAppenderFactory;
+import org.apache.raft.server.impl.RaftServer;
 
 public class PipelinedLogAppenderFactory implements LogAppenderFactory {
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
index d0fba89..2f06c59 100644
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
+++ 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
@@ -18,7 +18,7 @@
 package org.apache.raft.grpc.server;
 
 import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.server.RequestDispatcher;
+import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
 import org.apache.raft.shaded.proto.RaftProtos.*;
 import 
org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
index 8316db4..420ee88 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
@@ -25,10 +25,10 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.grpc.client.RaftClientSenderWithGrpc;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.BlockRequestHandlingInjection;
-import org.apache.raft.server.DelayLocalExecutionInjection;
-import org.apache.raft.server.LogAppenderFactory;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.BlockRequestHandlingInjection;
+import org.apache.raft.server.impl.DelayLocalExecutionInjection;
+import org.apache.raft.server.impl.LogAppenderFactory;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.util.NetUtils;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
index 18f9751..a8357c9 100644
--- 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
+++ 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
@@ -21,7 +21,7 @@ import org.apache.raft.MiniRaftCluster;
 import org.apache.raft.RaftNotLeaderExceptionBaseTest;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.LogAppenderFactory;
+import org.apache.raft.server.impl.LogAppenderFactory;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
index 7912858..83e6c62 100644
--- 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
+++ 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
@@ -20,8 +20,8 @@ package org.apache.raft.grpc;
 import org.apache.log4j.Level;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.raft.grpc.server.RaftServerProtocolService;
-import org.apache.raft.server.LogAppenderFactory;
-import org.apache.raft.server.RaftReconfigurationBaseTest;
+import org.apache.raft.server.impl.LogAppenderFactory;
+import org.apache.raft.server.impl.RaftReconfigurationBaseTest;
 import org.apache.raft.util.RaftUtils;
 import org.junit.BeforeClass;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
index 981ac5d..0c85854 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
@@ -23,8 +23,8 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.grpc.client.AppendStreamer;
 import org.apache.raft.grpc.client.RaftOutputStream;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.LogAppenderFactory;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.LogAppenderFactory;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.storage.RaftLog;
 import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.raft.util.RaftUtils;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
index 083948a..b2d104b 100644
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
+++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
@@ -20,9 +20,9 @@ package org.apache.raft.grpc;
 import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.BlockRequestHandlingInjection;
-import org.apache.raft.server.LogAppenderFactory;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.BlockRequestHandlingInjection;
+import org.apache.raft.server.impl.LogAppenderFactory;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
index 3c22f12..ad4beec 100644
--- 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
+++ 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java
@@ -26,6 +26,9 @@ import org.apache.raft.hadooprpc.client.RaftClientProtocolPB;
 import 
org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
 import org.apache.raft.protocol.RaftPeer;
 import org.apache.raft.server.*;
+import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerRpc;
+import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.com.google.protobuf.BlockingService;
 import org.apache.raft.shaded.com.google.protobuf.ServiceException;
 import org.apache.raft.shaded.proto.RaftProtos.*;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index ed3da7f..7f7ef49 100644
--- 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -26,8 +26,8 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.hadooprpc.client.HadoopClientRequestSender;
 import org.apache.raft.hadooprpc.server.HadoopRpcService;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.DelayLocalExecutionInjection;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.DelayLocalExecutionInjection;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
index fef13d1..0116280 100644
--- 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
+++ 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
@@ -20,7 +20,7 @@ package org.apache.raft.hadooprpc;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.raft.MiniRaftCluster;
 import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.RaftReconfigurationBaseTest;
+import org.apache.raft.server.impl.RaftReconfigurationBaseTest;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
index e63b119..3971274 100644
--- 
a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
+++ 
b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java
@@ -21,8 +21,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.client.RaftClient;
-import org.apache.raft.server.BlockRequestHandlingInjection;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.BlockRequestHandlingInjection;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.server.RaftServerConfigKeys;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java 
b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
index 9c728ce..c0d751d 100644
--- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
+++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java
@@ -29,9 +29,9 @@ import org.apache.raft.client.impl.ClientProtoUtils;
 import org.apache.raft.netty.NettyRpcProxy;
 import org.apache.raft.protocol.RaftClientReply;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.RequestDispatcher;
+import org.apache.raft.server.impl.RaftServer;
+import org.apache.raft.server.impl.RaftServerRpc;
+import org.apache.raft.server.impl.RequestDispatcher;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
 import 
org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java 
b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
index 3ccb335..4958e9a 100644
--- 
a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
+++ 
b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java
@@ -24,11 +24,10 @@ import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.netty.client.NettyClientRequestSender;
 import org.apache.raft.netty.server.NettyRpcService;
 import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.DelayLocalExecutionInjection;
-import org.apache.raft.server.RaftConfiguration;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.DelayLocalExecutionInjection;
+import org.apache.raft.server.impl.RaftConfiguration;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.util.NetUtils;
-import org.apache.raft.util.RaftUtils;
 
 import java.io.IOException;
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java
 
b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java
index 17e0a21..c4dd914 100644
--- 
a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java
+++ 
b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java
@@ -18,7 +18,7 @@
 package org.apache.raft.netty;
 
 import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.server.RaftReconfigurationBaseTest;
+import org.apache.raft.server.impl.RaftReconfigurationBaseTest;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
----------------------------------------------------------------------
diff --git 
a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java 
b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
index 916c374..fb75b7b 100644
--- a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
+++ b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java
@@ -20,8 +20,8 @@ package org.apache.raft.netty;
 import org.apache.log4j.Level;
 import org.apache.raft.RaftBasicTests;
 import org.apache.raft.client.RaftClient;
-import org.apache.raft.server.BlockRequestHandlingInjection;
-import org.apache.raft.server.RaftServer;
+import org.apache.raft.server.impl.BlockRequestHandlingInjection;
+import org.apache.raft.server.impl.RaftServer;
 import org.apache.raft.util.RaftUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java 
b/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java
deleted file mode 100644
index 405cf2f..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.util.*;
-
-/**
- * Maintain the mappings between log index and corresponding raft 
configuration.
- * Initialized when starting the raft peer. The mappings are loaded from the
- * raft log, and updated while appending/truncating configuration related log
- * entries.
- */
-public class ConfigurationManager {
-  private RaftConfiguration initialConf;
-  private final NavigableMap<Long, RaftConfiguration> configurations =
-      new TreeMap<>();
-  /**
-   * The current raft configuration. If configurations is not empty, should be
-   * the last entry of the map. Otherwise is initialConf.
-   */
-  private RaftConfiguration currentConf;
-
-  public ConfigurationManager(RaftConfiguration initialConf) {
-    setInitialConf(initialConf);
-  }
-
-  @VisibleForTesting
-  public synchronized void setInitialConf(RaftConfiguration initialConf) {
-    /**
-     * initialConf should actually be defined as "final". But for tests we want
-     * to change the initial configuration to reflect the correct port binding.
-     */
-    this.initialConf = initialConf;
-    this.currentConf = initialConf;
-  }
-
-  public synchronized void addConfiguration(long logIndex,
-      RaftConfiguration conf) {
-    Preconditions.checkArgument(configurations.isEmpty() ||
-        configurations.lastEntry().getKey() < logIndex);
-    configurations.put(logIndex, conf);
-    this.currentConf = conf;
-  }
-
-  synchronized RaftConfiguration getCurrent() {
-    return currentConf;
-  }
-
-  /**
-   * Remove all the configurations whose log index is >= the given index.
-   * @param index The given index. All the configurations whose log index is >=
-   *              this value will be removed.
-   * @return The configuration with largest log index < the given index.
-   */
-  synchronized RaftConfiguration removeConfigurations(long index) {
-    SortedMap<Long, RaftConfiguration> toRemove = 
configurations.tailMap(index);
-    for (Iterator<Map.Entry<Long, RaftConfiguration>> iter =
-         toRemove.entrySet().iterator(); iter.hasNext();) {
-      iter.next();
-      iter.remove();
-    }
-    currentConf = configurations.isEmpty() ? initialConf :
-        configurations.lastEntry().getValue();
-    return currentConf;
-  }
-
-  @VisibleForTesting
-  synchronized int numOfConf() {
-    return 1 + configurations.size();
-  }
-
-  // TODO: remove Configuration entries after they are committed
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java 
b/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java
deleted file mode 100644
index 8e5c131..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.util.Timestamp;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class FollowerInfo {
-  private final RaftPeer peer;
-  private final AtomicReference<Timestamp> lastRpcResponseTime;
-  private final AtomicReference<Timestamp> lastRpcSendTime;
-  private long nextIndex;
-  private final AtomicLong matchIndex;
-  private volatile boolean attendVote;
-
-  FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
-      boolean attendVote) {
-    this.peer = peer;
-    this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
-    this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
-    this.nextIndex = nextIndex;
-    this.matchIndex = new AtomicLong(0);
-    this.attendVote = attendVote;
-  }
-
-  public void updateMatchIndex(final long matchIndex) {
-    this.matchIndex.set(matchIndex);
-  }
-
-  long getMatchIndex() {
-    return matchIndex.get();
-  }
-
-  public synchronized long getNextIndex() {
-    return nextIndex;
-  }
-
-  public synchronized void updateNextIndex(long i) {
-    nextIndex = i;
-  }
-
-  public synchronized void decreaseNextIndex(long targetIndex) {
-    if (nextIndex > 0) {
-      nextIndex = Math.min(nextIndex - 1, targetIndex);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return peer.getId() + "(next=" + nextIndex + ", match=" + matchIndex + "," 
+
-        " attendVote=" + attendVote +
-        ", lastRpcSendTime=" + lastRpcSendTime +
-        ", lastRpcResponseTime=" + lastRpcResponseTime + ")";
-  }
-
-  void startAttendVote() {
-    attendVote = true;
-  }
-
-  public boolean isAttendingVote() {
-    return attendVote;
-  }
-
-  public RaftPeer getPeer() {
-    return peer;
-  }
-
-  /** Update lastRpcResponseTime to the current time. */
-  public void updateLastRpcResponseTime() {
-    lastRpcResponseTime.set(new Timestamp());
-  }
-
-  public Timestamp getLastRpcResponseTime() {
-    return lastRpcResponseTime.get();
-  }
-
-  /** Update lastRpcSendTime to the current time. */
-  public void updateLastRpcSendTime() {
-    lastRpcSendTime.set(new Timestamp());
-  }
-
-  public Timestamp getLastRpcTime() {
-    return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/FollowerState.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/FollowerState.java 
b/raft-server/src/main/java/org/apache/raft/server/FollowerState.java
deleted file mode 100644
index e5293e4..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/FollowerState.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-/**
- * Used when the peer is a follower. Used to track the election timeout.
- */
-class FollowerState extends Daemon {
-  static final Logger LOG = RaftServer.LOG;
-
-  private final RaftServer server;
-
-  private volatile Timestamp lastRpcTime = new Timestamp();
-  private volatile boolean monitorRunning = true;
-  private volatile boolean inLogSync = false;
-
-  FollowerState(RaftServer server) {
-    this.server = server;
-  }
-
-  void updateLastRpcTime(boolean inLogSync) {
-    lastRpcTime = new Timestamp();
-    LOG.trace("{} update last rpc time to {}", server.getId(), lastRpcTime);
-    this.inLogSync = inLogSync;
-  }
-
-  Timestamp getLastRpcTime() {
-    return lastRpcTime;
-  }
-
-  boolean shouldWithholdVotes() {
-    return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs();
-  }
-
-  void stopRunning() {
-    this.monitorRunning = false;
-  }
-
-  @Override
-  public  void run() {
-    while (monitorRunning && server.isFollower()) {
-      final long electionTimeout = server.getRandomTimeoutMs();
-      try {
-        Thread.sleep(electionTimeout);
-        if (!monitorRunning || !server.isFollower()) {
-          LOG.info("{} heartbeat monitor quit", server.getId());
-          break;
-        }
-        synchronized (server) {
-          if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
-            LOG.info("{} changes to {}, lastRpcTime:{}, electionTimeout:{}",
-                server.getId(), Role.CANDIDATE, lastRpcTime, electionTimeout);
-            // election timeout, should become a candidate
-            server.changeToCandidate();
-            break;
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.info(this + " was interrupted: " + e);
-        LOG.trace("TRACE", e);
-        return;
-      } catch (Exception e) {
-        LOG.warn(this + " caught an exception", e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return server.getId() + ": " + getClass().getSimpleName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java 
b/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java
deleted file mode 100644
index 8a97494..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.shaded.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-
-class LeaderElection extends Daemon {
-  public static final Logger LOG = 
LoggerFactory.getLogger(LeaderElection.class);
-
-  private ResultAndTerm logAndReturn(Result result,
-      List<RequestVoteReplyProto> responses,
-      List<Exception> exceptions, long newTerm) {
-    LOG.info(server.getId() + ": Election " + result + "; received "
-        + responses.size() + " response(s) "
-        + responses.stream().map(r -> 
ProtoUtils.toString(r)).collect(Collectors.toList())
-        + " and " + exceptions.size() + " exception(s); " + server.getState());
-    int i = 0;
-    for(Exception e : exceptions) {
-      LOG.info("  " + i++ + ": " + e);
-      LOG.trace("TRACE", e);
-    }
-    return new ResultAndTerm(result, newTerm);
-  }
-
-  enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN}
-
-  private static class ResultAndTerm {
-    final Result result;
-    final long term;
-
-    ResultAndTerm(Result result, long term) {
-      this.result = result;
-      this.term = term;
-    }
-  }
-
-  private final RaftServer server;
-  private ExecutorCompletionService<RequestVoteReplyProto> service;
-  private ExecutorService executor;
-  private volatile boolean running;
-  /**
-   * The Raft configuration should not change while the peer is in candidate
-   * state. If the configuration changes, another peer should be acting as a
-   * leader and this LeaderElection session should end.
-   */
-  private final RaftConfiguration conf;
-  private final Collection<RaftPeer> others;
-
-  LeaderElection(RaftServer server) {
-    this.server = server;
-    conf = server.getRaftConf();
-    others = conf.getOtherPeers(server.getId());
-    this.running = true;
-  }
-
-  void stopRunning() {
-    this.running = false;
-  }
-
-  private void initExecutor() {
-    Preconditions.checkState(!others.isEmpty());
-    executor = Executors.newFixedThreadPool(others.size(),
-        new ThreadFactoryBuilder().setDaemon(true).build());
-    service = new ExecutorCompletionService<>(executor);
-  }
-
-  @Override
-  public void run() {
-    try {
-      askForVotes();
-    } catch (InterruptedException e) {
-      // the leader election thread is interrupted. The peer may already step
-      // down to a follower. The leader election should skip.
-      LOG.info("The leader election thread of peer {} is interrupted. " +
-          "Currently role: {}.", server.getId(), server.getRole());
-    } catch (IOException e) {
-      LOG.warn("Failed to persist votedFor/term. Exit the leader election.", 
e);
-      stopRunning();
-    }
-  }
-
-  /**
-   * After a peer changes its role to candidate, it invokes this method to
-   * send out requestVote rpc to all other peers.
-   */
-  private void askForVotes() throws InterruptedException, IOException {
-    final ServerState state = server.getState();
-    while (running && server.isCandidate()) {
-      // one round of requestVotes
-      final long electionTerm;
-      synchronized (server) {
-        electionTerm = state.initElection();
-        server.getState().persistMetadata();
-      }
-      LOG.info(state.getSelfId() + ": begin an election in Term "
-          + electionTerm);
-
-      TermIndex lastEntry = ServerProtoUtils.toTermIndex(
-          state.getLog().getLastEntry());
-      if (lastEntry == null) {
-        // lastEntry may need to be derived from snapshot
-        SnapshotInfo snapshot = state.getLatestSnapshot();
-        if (snapshot != null) {
-          lastEntry = snapshot.getTermIndex();
-        }
-      }
-
-      final ResultAndTerm r;
-      if (others.isEmpty()) {
-        r = new ResultAndTerm(Result.PASSED, electionTerm);
-      } else {
-        try {
-          initExecutor();
-          int submitted = submitRequests(electionTerm, lastEntry);
-          r = waitForResults(electionTerm, submitted);
-        } finally {
-          if (executor != null) {
-            executor.shutdown();
-          }
-        }
-      }
-
-      synchronized (server) {
-        if (electionTerm != state.getCurrentTerm() || !running ||
-            !server.isCandidate()) {
-          return; // term already passed or no longer a candidate.
-        }
-
-        switch (r.result) {
-          case PASSED:
-            server.changeToLeader();
-            return;
-          case SHUTDOWN:
-            LOG.info("{} received shutdown response when requesting votes.",
-                server.getId());
-            server.close();
-            return;
-          case REJECTED:
-          case DISCOVERED_A_NEW_TERM:
-            final long term = r.term > server.getState().getCurrentTerm() ?
-                r.term : server.getState().getCurrentTerm();
-            server.changeToFollower(term, true);
-            return;
-          case TIMEOUT:
-            // should start another election
-        }
-      }
-    }
-  }
-
-  private int submitRequests(final long electionTerm, final TermIndex 
lastEntry) {
-    int submitted = 0;
-    for (final RaftPeer peer : others) {
-      final RequestVoteRequestProto r = server.createRequestVoteRequest(
-          peer.getId(), electionTerm, lastEntry);
-      service.submit(
-          () -> server.getServerRpc().sendRequestVote(r));
-      submitted++;
-    }
-    return submitted;
-  }
-
-  private ResultAndTerm waitForResults(final long electionTerm,
-      final int submitted) throws InterruptedException {
-    final Timestamp timeout = new 
Timestamp().addTimeMs(server.getRandomTimeoutMs());
-    final List<RequestVoteReplyProto> responses = new ArrayList<>();
-    final List<Exception> exceptions = new ArrayList<>();
-    int waitForNum = submitted;
-    Collection<String> votedPeers = new ArrayList<>();
-    while (waitForNum > 0 && running && server.isCandidate()) {
-      final long waitTime = -timeout.elapsedTimeMs();
-      if (waitTime <= 0) {
-        return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
-      }
-
-      try {
-        final Future<RequestVoteReplyProto> future = service.poll(
-            waitTime, TimeUnit.MILLISECONDS);
-        if (future == null) {
-          continue; // poll timeout, continue to return Result.TIMEOUT
-        }
-
-        final RequestVoteReplyProto r = future.get();
-        responses.add(r);
-        if (r.getShouldShutdown()) {
-          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
-        }
-        if (r.getTerm() > electionTerm) {
-          return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
-              exceptions, r.getTerm());
-        }
-        if (r.getServerReply().getSuccess()) {
-          votedPeers.add(r.getServerReply().getReplyId());
-          if (conf.hasMajority(votedPeers, server.getId())) {
-            return logAndReturn(Result.PASSED, responses, exceptions, -1);
-          }
-        }
-      } catch(ExecutionException e) {
-        LOG.info("Got exception when requesting votes: " + e);
-        LOG.trace("TRACE", e);
-        exceptions.add(e);
-      }
-      waitForNum--;
-    }
-    // received all the responses
-    return logAndReturn(Result.REJECTED, responses, exceptions, -1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LeaderState.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/LeaderState.java 
b/raft-server/src/main/java/org/apache/raft/server/LeaderState.java
deleted file mode 100644
index 1576311..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LeaderState.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.proto.RaftProtos.LeaderNoOp;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.LeaderState.StateUpdateEventType.*;
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-
-/**
- * States for leader only. It contains three different types of processors:
- * 1. RPC senders: each thread is appending log to a follower
- * 2. EventProcessor: a single thread updating the raft server's state based on
- *                    status of log appending response
- * 3. PendingRequestHandler: a handler sending back responses to clients when
- *                           corresponding log entries are committed
- */
-public class LeaderState {
-  private static final Logger LOG = RaftServer.LOG;
-
-  enum StateUpdateEventType {
-    STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
-  }
-
-  enum BootStrapProgress {
-    NOPROGRESS, PROGRESSING, CAUGHTUP
-  }
-
-  static class StateUpdateEvent {
-    final StateUpdateEventType type;
-    final long newTerm;
-
-    StateUpdateEvent(StateUpdateEventType type, long newTerm) {
-      this.type = type;
-      this.newTerm = newTerm;
-    }
-  }
-
-  static final StateUpdateEvent UPDATE_COMMIT_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1);
-  static final StateUpdateEvent STAGING_PROGRESS_EVENT =
-      new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1);
-
-  private final RaftServer server;
-  private final RaftLog raftLog;
-  private final long currentTerm;
-  private volatile ConfigurationStagingState stagingState;
-  private List<List<FollowerInfo>> voterLists;
-
-  /**
-   * The list of threads appending entries to followers.
-   * The list is protected by the RaftServer's lock.
-   */
-  private final List<LogAppender> senders;
-  private final BlockingQueue<StateUpdateEvent> eventQ;
-  private final EventProcessor processor;
-  private final PendingRequests pendingRequests;
-  private volatile boolean running = true;
-
-  private final int stagingCatchupGap;
-  private final int snapshotChunkMaxSize;
-  private final int syncInterval;
-
-  LeaderState(RaftServer server, RaftProperties properties) {
-    this.server = server;
-
-    stagingCatchupGap = properties.getInt(
-        RAFT_SERVER_STAGING_CATCHUP_GAP_KEY,
-        RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT);
-    snapshotChunkMaxSize = properties.getInt(
-        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY,
-        RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT);
-    syncInterval = properties.getInt(
-        RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY,
-        RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT);
-
-    final ServerState state = server.getState();
-    this.raftLog = state.getLog();
-    this.currentTerm = state.getCurrentTerm();
-    eventQ = new ArrayBlockingQueue<>(4096);
-    processor = new EventProcessor();
-    pendingRequests = new PendingRequests(server);
-
-    final RaftConfiguration conf = server.getRaftConf();
-    Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
-    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final long nextIndex = raftLog.getNextIndex();
-    senders = new ArrayList<>(others.size());
-    for (RaftPeer p : others) {
-      FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
-      senders.add(server.getLogAppenderFactory().getLogAppender(server, this, 
f));
-    }
-    voterLists = divideFollowers(conf);
-  }
-
-  void start() {
-    // In the beginning of the new term, replicate an empty entry in order
-    // to finally commit entries in the previous term.
-    // Also this message can help identify the last committed index when
-    // the leader peer is just started.
-    final LogEntryProto placeHolder = LogEntryProto.newBuilder()
-        .setTerm(server.getState().getCurrentTerm())
-        .setIndex(raftLog.getNextIndex())
-        .setNoOp(LeaderNoOp.newBuilder()).build();
-    raftLog.append(placeHolder);
-
-    processor.start();
-    startSenders();
-  }
-
-  private void startSenders() {
-    senders.forEach(Thread::start);
-  }
-
-  void stop() {
-    this.running = false;
-    // do not interrupt event processor since it may be in the middle of 
logSync
-    for (LogAppender sender : senders) {
-      sender.stopSender();
-      sender.interrupt();
-    }
-    try {
-      pendingRequests.sendNotLeaderResponses();
-    } catch (IOException e) {
-      LOG.warn("Caught exception in sendNotLeaderResponses", e);
-    }
-  }
-
-  void notifySenders() {
-    senders.forEach(LogAppender::notifyAppend);
-  }
-
-  boolean inStagingState() {
-    return stagingState != null;
-  }
-
-  ConfigurationStagingState getStagingState() {
-    return stagingState;
-  }
-
-  long getCurrentTerm() {
-    return currentTerm;
-  }
-
-  int getSnapshotChunkMaxSize() {
-    return snapshotChunkMaxSize;
-  }
-
-  int getSyncInterval() {
-    return syncInterval;
-  }
-
-  /**
-   * Start bootstrapping new peers
-   */
-  PendingRequest startSetConfiguration(SetConfigurationRequest request) {
-    Preconditions.checkState(running && !inStagingState());
-
-    RaftPeer[] peersInNewConf = request.getPeersInNewConf();
-    Collection<RaftPeer> peersToBootStrap = RaftConfiguration
-        .computeNewPeers(peersInNewConf, server.getRaftConf());
-
-    // add the request to the pending queue
-    final PendingRequest pending = pendingRequests.addConfRequest(request);
-
-    ConfigurationStagingState stagingState = new ConfigurationStagingState(
-        peersToBootStrap, new 
PeerConfiguration(Arrays.asList(peersInNewConf)));
-    Collection<RaftPeer> newPeers = stagingState.getNewPeers();
-    // set the staging state
-    this.stagingState = stagingState;
-
-    if (newPeers.isEmpty()) {
-      applyOldNewConf();
-    } else {
-      // update the LeaderState's sender list
-      addSenders(newPeers);
-    }
-    return pending;
-  }
-
-  PendingRequest addPendingRequest(long index, RaftClientRequest request,
-      TransactionContext entry) {
-    return pendingRequests.addPendingRequest(index, request, entry);
-  }
-
-  private void applyOldNewConf() {
-    final ServerState state = server.getState();
-    final RaftConfiguration current = server.getRaftConf();
-    final RaftConfiguration oldNewConf= 
stagingState.generateOldNewConf(current,
-        state.getLog().getNextIndex());
-    // apply the (old, new) configuration to log, and use it as the current 
conf
-    long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
-    updateConfiguration(index, oldNewConf);
-
-    this.stagingState = null;
-    notifySenders();
-  }
-
-  private void updateConfiguration(long logIndex, RaftConfiguration newConf) {
-    voterLists = divideFollowers(newConf);
-    server.getState().setRaftConf(logIndex, newConf);
-  }
-
-  /**
-   * After receiving a setConfiguration request, the leader should update its
-   * RpcSender list.
-   */
-  void addSenders(Collection<RaftPeer> newMembers) {
-    final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final long nextIndex = raftLog.getNextIndex();
-    for (RaftPeer peer : newMembers) {
-      FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
-      LogAppender sender = server.getLogAppenderFactory()
-          .getLogAppender(server, this, f);
-      senders.add(sender);
-      sender.start();
-    }
-  }
-
-  /**
-   * Update the RpcSender list based on the current configuration
-   */
-  private void updateSenders(RaftConfiguration conf) {
-    Preconditions.checkState(conf.isStable() && !inStagingState());
-    Iterator<LogAppender> iterator = senders.iterator();
-    while (iterator.hasNext()) {
-      LogAppender sender = iterator.next();
-      if (!conf.containsInConf(sender.getFollower().getPeer().getId())) {
-        iterator.remove();
-        sender.stopSender();
-        sender.interrupt();
-      }
-    }
-  }
-
-  void submitUpdateStateEvent(StateUpdateEvent event) {
-    try {
-      eventQ.put(event);
-    } catch (InterruptedException e) {
-      LOG.info("Interrupted when adding event {} into the queue", event);
-    }
-  }
-
-  private void prepare() {
-    synchronized (server) {
-      if (running) {
-        final RaftConfiguration conf = server.getRaftConf();
-        if (conf.isTransitional() && server.getState().isConfCommitted()) {
-          // the configuration is in transitional state, and has been committed
-          // so it is time to generate and replicate (new) conf.
-          replicateNewConf();
-        }
-      }
-    }
-  }
-
-  /**
-   * The processor thread takes the responsibility to update the raft server's
-   * state, such as changing to follower, or updating the committed index.
-   */
-  private class EventProcessor extends Daemon {
-    @Override
-    public void run() {
-      // apply an empty message; check if necessary to replicate (new) conf
-      prepare();
-
-      while (running) {
-        try {
-          StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(),
-              TimeUnit.MILLISECONDS);
-          synchronized (server) {
-            if (running) {
-              handleEvent(event);
-            }
-          }
-          // the updated configuration does not need to be sync'ed here
-        } catch (InterruptedException e) {
-          if (!running) {
-            LOG.info("The LeaderState gets is stopped");
-          } else {
-            LOG.warn("The leader election thread of peer {} is interrupted. "
-                + "Currently role: {}.", server.getId(), server.getRole());
-            throw new RuntimeException(e);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to persist new votedFor/term.", e);
-          // the failure should happen while changing the state to follower
-          // thus the in-memory state should have been updated
-          Preconditions.checkState(!running);
-        }
-      }
-    }
-  }
-
-  private void handleEvent(StateUpdateEvent e) throws IOException {
-    if (e == null) {
-      if (inStagingState()) {
-        checkNewPeers();
-      }
-    } else {
-      if (e.type == STEPDOWN) {
-        server.changeToFollower(e.newTerm, true);
-      } else if (e.type == UPDATECOMMIT) {
-        updateLastCommitted();
-      } else if (e.type == STAGINGPROGRESS) {
-        checkNewPeers();
-      }
-    }
-  }
-
-  /**
-   * So far we use a simple implementation for catchup checking:
-   * 1. If the latest rpc time of the remote peer is before 3 * max_timeout,
-   *    the peer made no progress for that long. We should fail the whole
-   *    setConfiguration request.
-   * 2. If the peer's matching index is just behind for a small gap, and the
-   *    peer was updated recently (within max_timeout), declare the peer as
-   *    caught-up.
-   * 3. Otherwise the peer is making progressing. Keep waiting.
-   */
-  private BootStrapProgress checkProgress(FollowerInfo follower,
-      long committed) {
-    Preconditions.checkArgument(!follower.isAttendingVote());
-    final Timestamp progressTime = new 
Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final Timestamp timeoutTime = new 
Timestamp().addTimeMs(-3*server.getMaxTimeoutMs());
-    if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
-      LOG.debug("{} detects a follower {} timeout for bootstrapping," +
-              " timeoutTime: {}", server.getId(), follower, timeoutTime);
-      return BootStrapProgress.NOPROGRESS;
-    } else if (follower.getMatchIndex() + stagingCatchupGap > committed
-        && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
-      return BootStrapProgress.CAUGHTUP;
-    } else {
-      return BootStrapProgress.PROGRESSING;
-    }
-  }
-
-  private Collection<BootStrapProgress> checkAllProgress(long committed) {
-    Preconditions.checkState(inStagingState());
-    return senders.stream()
-        .filter(sender -> !sender.getFollower().isAttendingVote())
-        .map(sender -> checkProgress(sender.getFollower(), committed))
-        .collect(Collectors.toCollection(ArrayList::new));
-  }
-
-  private void checkNewPeers() {
-    if (!inStagingState()) {
-      // it is possible that the bootstrapping is done and we still have
-      // remaining STAGINGPROGRESS event to handle.
-      updateLastCommitted();
-    } else {
-      final long committedIndex = server.getState().getLog()
-          .getLastCommittedIndex();
-      Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
-      if (reports.contains(BootStrapProgress.NOPROGRESS)) {
-        LOG.debug("{} fails the setConfiguration request", server.getId());
-        stagingState.fail();
-      } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
-        // all caught up!
-        applyOldNewConf();
-        for (LogAppender sender : senders) {
-          sender.getFollower().startAttendVote();
-        }
-      }
-    }
-  }
-
-  boolean isBootStrappingPeer(String peerId) {
-    return inStagingState() && getStagingState().contains(peerId);
-  }
-
-  private void updateLastCommitted() {
-    final String selfId = server.getId();
-    final RaftConfiguration conf = server.getRaftConf();
-    long majorityInNewConf = computeLastCommitted(voterLists.get(0),
-        conf.containsInConf(selfId));
-    final long oldLastCommitted = raftLog.getLastCommittedIndex();
-    final LogEntryProto[] entriesToCommit;
-    if (!conf.isTransitional()) {
-      // copy the entries that may get committed out of the raftlog, to prevent
-      // the possible race that the log gets purged after the statemachine does
-      // a snapshot
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majorityInNewConf, oldLastCommitted) + 1);
-      server.getState().updateStatemachine(majorityInNewConf, currentTerm);
-    } else { // configuration is in transitional state
-      long majorityInOldConf = computeLastCommitted(voterLists.get(1),
-          conf.containsInOldConf(selfId));
-      final long majority = Math.min(majorityInNewConf, majorityInOldConf);
-      entriesToCommit = raftLog.getEntries(oldLastCommitted + 1,
-          Math.max(majority, oldLastCommitted) + 1);
-      server.getState().updateStatemachine(majority, currentTerm);
-    }
-    checkAndUpdateConfiguration(entriesToCommit);
-  }
-
-  private boolean committedConf(LogEntryProto[] entries) {
-    final long currentCommitted = raftLog.getLastCommittedIndex();
-    for (LogEntryProto entry : entries) {
-      if (entry.getIndex() <= currentCommitted &&
-          ProtoUtils.isConfigurationLogEntry(entry)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) {
-    final RaftConfiguration conf = server.getRaftConf();
-    if (committedConf(entriesToCheck)) {
-      if (conf.isTransitional()) {
-        replicateNewConf();
-      } else { // the (new) log entry has been committed
-        LOG.debug("{} sends success to setConfiguration request", 
server.getId());
-        pendingRequests.replySetConfiguration();
-        // if the leader is not included in the current configuration, step 
down
-        if (!conf.containsInConf(server.getId())) {
-          LOG.info("{} is not included in the new configuration {}. Step 
down.",
-              server.getId(), conf);
-          try {
-            // leave some time for all RPC senders to send out new conf entry
-            Thread.sleep(server.getMinTimeoutMs());
-          } catch (InterruptedException ignored) {
-          }
-          // the pending request handler will send NotLeaderException for
-          // pending client requests when it stops
-          server.close();
-        }
-      }
-    }
-  }
-
-  /**
-   * when the (old, new) log entry has been committed, should replicate (new):
-   * 1) append (new) to log
-   * 2) update conf to (new)
-   * 3) update RpcSenders list
-   * 4) start replicating the log entry
-   */
-  private void replicateNewConf() {
-    final RaftConfiguration conf = server.getRaftConf();
-    final RaftConfiguration newConf = RaftConfiguration.newBuilder()
-        .setConf(conf)
-        .setLogEntryIndex(raftLog.getNextIndex())
-        .build();
-    // stop the LogAppender if the corresponding follower is no longer in the 
conf
-    updateSenders(newConf);
-    long index = raftLog.append(server.getState().getCurrentTerm(), newConf);
-    updateConfiguration(index, newConf);
-    notifySenders();
-  }
-
-  private long computeLastCommitted(List<FollowerInfo> followers,
-      boolean includeSelf) {
-    final int length = includeSelf ? followers.size() + 1 : followers.size();
-    final long[] indices = new long[length];
-    for (int i = 0; i < followers.size(); i++) {
-      indices[i] = followers.get(i).getMatchIndex();
-    }
-    if (includeSelf) {
-      // note that we also need to wait for the local disk I/O
-      indices[length - 1] = raftLog.getLatestFlushedIndex();
-    }
-
-    Arrays.sort(indices);
-    return indices[(indices.length - 1) / 2];
-  }
-
-  private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
-    List<List<FollowerInfo>> lists = new ArrayList<>(2);
-    List<FollowerInfo> listForNew = senders.stream()
-        .filter(sender -> 
conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(LogAppender::getFollower)
-        .collect(Collectors.toList());
-    lists.add(listForNew);
-    if (conf.isTransitional()) {
-      List<FollowerInfo> listForOld = senders.stream()
-          .filter(sender -> 
conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(LogAppender::getFollower)
-          .collect(Collectors.toList());
-      lists.add(listForOld);
-    }
-    return lists;
-  }
-
-  PendingRequest returnNoConfChange(SetConfigurationRequest r) {
-    PendingRequest pending = new PendingRequest(r);
-    pending.setSuccessReply(null);
-    return pending;
-  }
-
-  void replyPendingRequest(long logIndex, CompletableFuture<Message> message) {
-    pendingRequests.replyPendingRequest(logIndex, message);
-  }
-
-  TransactionContext getTransactionContext(long index) {
-    return pendingRequests.getTransactionContext(index);
-  }
-
-  private class ConfigurationStagingState {
-    private final Map<String, RaftPeer> newPeers;
-    private final PeerConfiguration newConf;
-
-    ConfigurationStagingState(Collection<RaftPeer> newPeers,
-        PeerConfiguration newConf) {
-      Map<String, RaftPeer> map = new HashMap<>();
-      for (RaftPeer peer : newPeers) {
-        map.put(peer.getId(), peer);
-      }
-      this.newPeers = Collections.unmodifiableMap(map);
-      this.newConf = newConf;
-    }
-
-    RaftConfiguration generateOldNewConf(RaftConfiguration current,
-        long logIndex) {
-      return RaftConfiguration.newBuilder()
-          .setConf(newConf)
-          .setOldConf(current)
-          .setLogEntryIndex(logIndex)
-          .build();
-    }
-
-    Collection<RaftPeer> getNewPeers() {
-      return newPeers.values();
-    }
-
-    boolean contains(String peerId) {
-      return newPeers.containsKey(peerId);
-    }
-
-    void fail() {
-      Iterator<LogAppender> iterator = senders.iterator();
-      while (iterator.hasNext()) {
-        LogAppender sender = iterator.next();
-        if (!sender.getFollower().isAttendingVote()) {
-          iterator.remove();
-          sender.stopSender();
-          sender.interrupt();
-        }
-      }
-      LeaderState.this.stagingState = null;
-      // send back failure response to client's request
-      pendingRequests.failSetConfiguration(
-          new ReconfigurationTimeoutException("Fail to set configuration "
-              + newConf + ". Timeout when bootstrapping new peers."));
-    }
-  }
-}

Reply via email to