Move o.a.r.s.* to o.a.r.s.impl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c36810ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c36810ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c36810ed Branch: refs/heads/master Commit: c36810ed8dbd0e202f98ba57bd5730e7edff6863 Parents: dd0b631 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Sat Dec 31 17:23:13 2016 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Sat Dec 31 17:23:13 2016 +0800 ---------------------------------------------------------------------- .../arithmetic/ArithmeticStateMachine.java | 2 +- .../java/org/apache/raft/TestBatchAppend.java | 2 +- .../org/apache/raft/TestRestartRaftPeer.java | 2 +- .../TestRaftStateMachineException.java | 2 +- .../org/apache/raft/grpc/RaftGRpcService.java | 6 +- .../grpc/client/RaftClientProtocolService.java | 2 +- .../raft/grpc/server/GRpcLogAppender.java | 8 +- .../server/PipelinedLogAppenderFactory.java | 10 +- .../grpc/server/RaftServerProtocolService.java | 2 +- .../raft/grpc/MiniRaftClusterWithGRpc.java | 8 +- .../grpc/TestNotLeaderExceptionWithGrpc.java | 2 +- .../grpc/TestRaftReconfigurationWithGRpc.java | 4 +- .../org/apache/raft/grpc/TestRaftStream.java | 4 +- .../org/apache/raft/grpc/TestRaftWithGrpc.java | 6 +- .../raft/hadooprpc/server/HadoopRpcService.java | 3 + .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 4 +- .../TestRaftReconfigurationWithHadoopRpc.java | 2 +- .../raft/hadooprpc/TestRaftWithHadoopRpc.java | 4 +- .../raft/netty/server/NettyRpcService.java | 6 +- .../raft/netty/MiniRaftClusterWithNetty.java | 7 +- .../netty/TestRaftReconfigurationWithNetty.java | 2 +- .../apache/raft/netty/TestRaftWithNetty.java | 4 +- .../raft/server/ConfigurationManager.java | 91 --- .../org/apache/raft/server/FollowerInfo.java | 103 --- .../org/apache/raft/server/FollowerState.java | 91 --- .../org/apache/raft/server/LeaderElection.java | 242 ------ .../org/apache/raft/server/LeaderState.java | 581 -------------- .../org/apache/raft/server/LogAppender.java | 481 ------------ .../apache/raft/server/LogAppenderFactory.java | 31 - .../apache/raft/server/PeerConfiguration.java | 90 --- .../org/apache/raft/server/PendingRequest.java | 87 --- .../org/apache/raft/server/PendingRequests.java | 134 ---- .../apache/raft/server/RaftConfiguration.java | 261 ------- .../java/org/apache/raft/server/RaftServer.java | 750 ------------------- .../raft/server/RaftServerConfigKeys.java | 1 + .../apache/raft/server/RaftServerConstants.java | 46 -- .../org/apache/raft/server/RaftServerRpc.java | 44 -- .../apache/raft/server/RequestDispatcher.java | 137 ---- .../main/java/org/apache/raft/server/Role.java | 25 - .../org/apache/raft/server/ServerState.java | 346 --------- .../apache/raft/server/StateMachineUpdater.java | 213 ------ .../raft/server/impl/ConfigurationManager.java | 91 +++ .../apache/raft/server/impl/FollowerInfo.java | 103 +++ .../apache/raft/server/impl/FollowerState.java | 91 +++ .../apache/raft/server/impl/LeaderElection.java | 241 ++++++ .../apache/raft/server/impl/LeaderState.java | 581 ++++++++++++++ .../apache/raft/server/impl/LogAppender.java | 480 ++++++++++++ .../raft/server/impl/LogAppenderFactory.java | 31 + .../raft/server/impl/PeerConfiguration.java | 90 +++ .../apache/raft/server/impl/PendingRequest.java | 87 +++ .../raft/server/impl/PendingRequests.java | 129 ++++ .../raft/server/impl/RaftConfiguration.java | 261 +++++++ .../org/apache/raft/server/impl/RaftServer.java | 749 ++++++++++++++++++ .../raft/server/impl/RaftServerConstants.java | 46 ++ .../apache/raft/server/impl/RaftServerRpc.java | 44 ++ .../raft/server/impl/RequestDispatcher.java | 137 ++++ .../java/org/apache/raft/server/impl/Role.java | 25 + .../raft/server/impl/ServerProtoUtils.java | 3 +- .../apache/raft/server/impl/ServerState.java | 345 +++++++++ .../raft/server/impl/StateMachineUpdater.java | 213 ++++++ .../raft/server/storage/LogInputStream.java | 2 +- .../raft/server/storage/LogOutputStream.java | 2 +- .../apache/raft/server/storage/LogReader.java | 2 +- .../apache/raft/server/storage/LogSegment.java | 2 +- .../raft/server/storage/MemoryRaftLog.java | 4 +- .../org/apache/raft/server/storage/RaftLog.java | 6 +- .../raft/server/storage/RaftLogCache.java | 4 +- .../raft/server/storage/RaftLogWorker.java | 4 +- .../apache/raft/server/storage/RaftStorage.java | 2 +- .../server/storage/RaftStorageDirectory.java | 2 +- .../raft/server/storage/SegmentedRaftLog.java | 6 +- .../raft/statemachine/BaseStateMachine.java | 4 +- .../statemachine/SimpleStateMachineStorage.java | 2 +- .../apache/raft/statemachine/SnapshotInfo.java | 2 +- .../raft/statemachine/SnapshotInfoImpl.java | 2 +- .../apache/raft/statemachine/StateMachine.java | 12 +- .../raft/statemachine/TermIndexTracker.java | 2 +- .../java/org/apache/raft/MiniRaftCluster.java | 8 +- .../java/org/apache/raft/RaftBasicTests.java | 8 +- .../raft/RaftNotLeaderExceptionBaseTest.java | 10 +- .../test/java/org/apache/raft/RaftTestUtil.java | 6 +- .../server/BlockRequestHandlingInjection.java | 84 --- .../server/DelayLocalExecutionInjection.java | 67 -- .../server/RaftReconfigurationBaseTest.java | 576 -------------- .../apache/raft/server/RaftServerTestUtil.java | 67 -- .../impl/BlockRequestHandlingInjection.java | 84 +++ .../impl/DelayLocalExecutionInjection.java | 67 ++ .../impl/RaftReconfigurationBaseTest.java | 577 ++++++++++++++ .../raft/server/impl/RaftServerTestUtil.java | 67 ++ .../MiniRaftClusterWithSimulatedRpc.java | 2 +- .../server/simulation/SimulatedServerRpc.java | 6 +- ...TestRaftReconfigurationWithSimulatedRpc.java | 2 +- .../simulation/TestRaftWithSimulatedRpc.java | 2 +- .../server/storage/TestRaftLogReadWrite.java | 4 +- .../raft/server/storage/TestRaftLogSegment.java | 8 +- .../raft/server/storage/TestRaftStorage.java | 2 +- .../server/storage/TestSegmentedRaftLog.java | 4 +- .../raft/statemachine/RaftSnapshotBaseTest.java | 6 +- .../SimpleStateMachine4Testing.java | 2 +- .../raft/statemachine/TestStateMachine.java | 2 +- 100 files changed, 4651 insertions(+), 4663 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java index 4f038d2..b684669 100644 --- a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java +++ b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java @@ -22,7 +22,7 @@ import org.apache.raft.examples.arithmetic.expression.Expression; import org.apache.raft.protocol.Message; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.server.RaftServerConstants; +import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.protocol.TermIndex; import org.apache.raft.server.storage.RaftStorage; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java index 396bd47..c730245 100644 --- a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java +++ b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java @@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.statemachine.SimpleStateMachine4Testing; import org.apache.raft.statemachine.StateMachine; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java index 10741f9..fadd7a8 100644 --- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java +++ b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java @@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java index 61d7989..2fa6e53 100644 --- a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java +++ b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java @@ -24,7 +24,7 @@ import org.apache.raft.client.RaftClient; import org.apache.raft.examples.RaftExamplesTestUtil; import org.apache.raft.protocol.Message; import org.apache.raft.protocol.StateMachineException; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.util.RaftUtils; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java index 8259083..c9a0daf 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java @@ -23,9 +23,9 @@ import org.apache.raft.grpc.client.RaftClientProtocolService; import org.apache.raft.grpc.server.RaftServerProtocolClient; import org.apache.raft.grpc.server.RaftServerProtocolService; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.RequestDispatcher; +import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.Server; import org.apache.raft.shaded.io.grpc.ServerBuilder; import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java index d8704f6..32dbac7 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.raft.client.impl.ClientProtoUtils; import org.apache.raft.grpc.RaftGrpcUtil; import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.server.RequestDispatcher; +import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java index 2f1fc90..b171bed 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java @@ -20,10 +20,10 @@ package org.apache.raft.grpc.server; import com.google.common.base.Preconditions; import org.apache.raft.grpc.RaftGRpcService; import org.apache.raft.grpc.RaftGrpcConfigKeys; -import org.apache.raft.server.FollowerInfo; -import org.apache.raft.server.LeaderState; -import org.apache.raft.server.LogAppender; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.FollowerInfo; +import org.apache.raft.server.impl.LeaderState; +import org.apache.raft.server.impl.LogAppender; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.shaded.io.grpc.Status; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java index 90882df..5f01980 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java @@ -17,11 +17,11 @@ */ package org.apache.raft.grpc.server; -import org.apache.raft.server.FollowerInfo; -import org.apache.raft.server.LeaderState; -import org.apache.raft.server.LogAppender; -import org.apache.raft.server.LogAppenderFactory; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.FollowerInfo; +import org.apache.raft.server.impl.LeaderState; +import org.apache.raft.server.impl.LogAppender; +import org.apache.raft.server.impl.LogAppenderFactory; +import org.apache.raft.server.impl.RaftServer; public class PipelinedLogAppenderFactory implements LogAppenderFactory { @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java index d0fba89..2f06c59 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java @@ -18,7 +18,7 @@ package org.apache.raft.grpc.server; import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.server.RequestDispatcher; +import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.*; import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java index 8316db4..420ee88 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java @@ -25,10 +25,10 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.grpc.client.RaftClientSenderWithGrpc; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.BlockRequestHandlingInjection; -import org.apache.raft.server.DelayLocalExecutionInjection; -import org.apache.raft.server.LogAppenderFactory; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.BlockRequestHandlingInjection; +import org.apache.raft.server.impl.DelayLocalExecutionInjection; +import org.apache.raft.server.impl.LogAppenderFactory; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.util.NetUtils; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java index 18f9751..a8357c9 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java @@ -21,7 +21,7 @@ import org.apache.raft.MiniRaftCluster; import org.apache.raft.RaftNotLeaderExceptionBaseTest; import org.apache.raft.conf.RaftProperties; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.LogAppenderFactory; +import org.apache.raft.server.impl.LogAppenderFactory; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java index 7912858..83e6c62 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java @@ -20,8 +20,8 @@ package org.apache.raft.grpc; import org.apache.log4j.Level; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; import org.apache.raft.grpc.server.RaftServerProtocolService; -import org.apache.raft.server.LogAppenderFactory; -import org.apache.raft.server.RaftReconfigurationBaseTest; +import org.apache.raft.server.impl.LogAppenderFactory; +import org.apache.raft.server.impl.RaftReconfigurationBaseTest; import org.apache.raft.util.RaftUtils; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java index 981ac5d..0c85854 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java @@ -23,8 +23,8 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.grpc.client.AppendStreamer; import org.apache.raft.grpc.client.RaftOutputStream; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.LogAppenderFactory; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.LogAppenderFactory; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.util.RaftUtils; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java index 083948a..b2d104b 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java @@ -20,9 +20,9 @@ package org.apache.raft.grpc; import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.BlockRequestHandlingInjection; -import org.apache.raft.server.LogAppenderFactory; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.BlockRequestHandlingInjection; +import org.apache.raft.server.impl.LogAppenderFactory; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.util.RaftUtils; import org.junit.Assert; import org.junit.BeforeClass; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java index 3c22f12..ad4beec 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java @@ -26,6 +26,9 @@ import org.apache.raft.hadooprpc.client.RaftClientProtocolPB; import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.*; +import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.com.google.protobuf.BlockingService; import org.apache.raft.shaded.com.google.protobuf.ServiceException; import org.apache.raft.shaded.proto.RaftProtos.*; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java index ed3da7f..7f7ef49 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -26,8 +26,8 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.hadooprpc.client.HadoopClientRequestSender; import org.apache.raft.hadooprpc.server.HadoopRpcService; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.DelayLocalExecutionInjection; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.DelayLocalExecutionInjection; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.RaftServerConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java index fef13d1..0116280 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java @@ -20,7 +20,7 @@ package org.apache.raft.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.raft.MiniRaftCluster; import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.RaftReconfigurationBaseTest; +import org.apache.raft.server.impl.RaftReconfigurationBaseTest; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java index e63b119..3971274 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java @@ -21,8 +21,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.client.RaftClient; -import org.apache.raft.server.BlockRequestHandlingInjection; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.BlockRequestHandlingInjection; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.util.RaftUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java index 9c728ce..c0d751d 100644 --- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java +++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java @@ -29,9 +29,9 @@ import org.apache.raft.client.impl.ClientProtoUtils; import org.apache.raft.netty.NettyRpcProxy; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.RequestDispatcher; +import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java index 3ccb335..4958e9a 100644 --- a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java +++ b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java @@ -24,11 +24,10 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.netty.client.NettyClientRequestSender; import org.apache.raft.netty.server.NettyRpcService; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.DelayLocalExecutionInjection; -import org.apache.raft.server.RaftConfiguration; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.DelayLocalExecutionInjection; +import org.apache.raft.server.impl.RaftConfiguration; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.util.NetUtils; -import org.apache.raft.util.RaftUtils; import java.io.IOException; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java index 17e0a21..c4dd914 100644 --- a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java +++ b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftReconfigurationWithNetty.java @@ -18,7 +18,7 @@ package org.apache.raft.netty; import org.apache.raft.MiniRaftCluster; -import org.apache.raft.server.RaftReconfigurationBaseTest; +import org.apache.raft.server.impl.RaftReconfigurationBaseTest; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java index 916c374..fb75b7b 100644 --- a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java +++ b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java @@ -20,8 +20,8 @@ package org.apache.raft.netty; import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.client.RaftClient; -import org.apache.raft.server.BlockRequestHandlingInjection; -import org.apache.raft.server.RaftServer; +import org.apache.raft.server.impl.BlockRequestHandlingInjection; +import org.apache.raft.server.impl.RaftServer; import org.apache.raft.util.RaftUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java b/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java deleted file mode 100644 index 405cf2f..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/ConfigurationManager.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import java.util.*; - -/** - * Maintain the mappings between log index and corresponding raft configuration. - * Initialized when starting the raft peer. The mappings are loaded from the - * raft log, and updated while appending/truncating configuration related log - * entries. - */ -public class ConfigurationManager { - private RaftConfiguration initialConf; - private final NavigableMap<Long, RaftConfiguration> configurations = - new TreeMap<>(); - /** - * The current raft configuration. If configurations is not empty, should be - * the last entry of the map. Otherwise is initialConf. - */ - private RaftConfiguration currentConf; - - public ConfigurationManager(RaftConfiguration initialConf) { - setInitialConf(initialConf); - } - - @VisibleForTesting - public synchronized void setInitialConf(RaftConfiguration initialConf) { - /** - * initialConf should actually be defined as "final". But for tests we want - * to change the initial configuration to reflect the correct port binding. - */ - this.initialConf = initialConf; - this.currentConf = initialConf; - } - - public synchronized void addConfiguration(long logIndex, - RaftConfiguration conf) { - Preconditions.checkArgument(configurations.isEmpty() || - configurations.lastEntry().getKey() < logIndex); - configurations.put(logIndex, conf); - this.currentConf = conf; - } - - synchronized RaftConfiguration getCurrent() { - return currentConf; - } - - /** - * Remove all the configurations whose log index is >= the given index. - * @param index The given index. All the configurations whose log index is >= - * this value will be removed. - * @return The configuration with largest log index < the given index. - */ - synchronized RaftConfiguration removeConfigurations(long index) { - SortedMap<Long, RaftConfiguration> toRemove = configurations.tailMap(index); - for (Iterator<Map.Entry<Long, RaftConfiguration>> iter = - toRemove.entrySet().iterator(); iter.hasNext();) { - iter.next(); - iter.remove(); - } - currentConf = configurations.isEmpty() ? initialConf : - configurations.lastEntry().getValue(); - return currentConf; - } - - @VisibleForTesting - synchronized int numOfConf() { - return 1 + configurations.size(); - } - - // TODO: remove Configuration entries after they are committed -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java b/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java deleted file mode 100644 index 8e5c131..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/FollowerInfo.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.util.Timestamp; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -public class FollowerInfo { - private final RaftPeer peer; - private final AtomicReference<Timestamp> lastRpcResponseTime; - private final AtomicReference<Timestamp> lastRpcSendTime; - private long nextIndex; - private final AtomicLong matchIndex; - private volatile boolean attendVote; - - FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex, - boolean attendVote) { - this.peer = peer; - this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); - this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); - this.nextIndex = nextIndex; - this.matchIndex = new AtomicLong(0); - this.attendVote = attendVote; - } - - public void updateMatchIndex(final long matchIndex) { - this.matchIndex.set(matchIndex); - } - - long getMatchIndex() { - return matchIndex.get(); - } - - public synchronized long getNextIndex() { - return nextIndex; - } - - public synchronized void updateNextIndex(long i) { - nextIndex = i; - } - - public synchronized void decreaseNextIndex(long targetIndex) { - if (nextIndex > 0) { - nextIndex = Math.min(nextIndex - 1, targetIndex); - } - } - - @Override - public String toString() { - return peer.getId() + "(next=" + nextIndex + ", match=" + matchIndex + "," + - " attendVote=" + attendVote + - ", lastRpcSendTime=" + lastRpcSendTime + - ", lastRpcResponseTime=" + lastRpcResponseTime + ")"; - } - - void startAttendVote() { - attendVote = true; - } - - public boolean isAttendingVote() { - return attendVote; - } - - public RaftPeer getPeer() { - return peer; - } - - /** Update lastRpcResponseTime to the current time. */ - public void updateLastRpcResponseTime() { - lastRpcResponseTime.set(new Timestamp()); - } - - public Timestamp getLastRpcResponseTime() { - return lastRpcResponseTime.get(); - } - - /** Update lastRpcSendTime to the current time. */ - public void updateLastRpcSendTime() { - lastRpcSendTime.set(new Timestamp()); - } - - public Timestamp getLastRpcTime() { - return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/FollowerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/FollowerState.java b/raft-server/src/main/java/org/apache/raft/server/FollowerState.java deleted file mode 100644 index e5293e4..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/FollowerState.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import org.apache.raft.util.Daemon; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; - -/** - * Used when the peer is a follower. Used to track the election timeout. - */ -class FollowerState extends Daemon { - static final Logger LOG = RaftServer.LOG; - - private final RaftServer server; - - private volatile Timestamp lastRpcTime = new Timestamp(); - private volatile boolean monitorRunning = true; - private volatile boolean inLogSync = false; - - FollowerState(RaftServer server) { - this.server = server; - } - - void updateLastRpcTime(boolean inLogSync) { - lastRpcTime = new Timestamp(); - LOG.trace("{} update last rpc time to {}", server.getId(), lastRpcTime); - this.inLogSync = inLogSync; - } - - Timestamp getLastRpcTime() { - return lastRpcTime; - } - - boolean shouldWithholdVotes() { - return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs(); - } - - void stopRunning() { - this.monitorRunning = false; - } - - @Override - public void run() { - while (monitorRunning && server.isFollower()) { - final long electionTimeout = server.getRandomTimeoutMs(); - try { - Thread.sleep(electionTimeout); - if (!monitorRunning || !server.isFollower()) { - LOG.info("{} heartbeat monitor quit", server.getId()); - break; - } - synchronized (server) { - if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) { - LOG.info("{} changes to {}, lastRpcTime:{}, electionTimeout:{}", - server.getId(), Role.CANDIDATE, lastRpcTime, electionTimeout); - // election timeout, should become a candidate - server.changeToCandidate(); - break; - } - } - } catch (InterruptedException e) { - LOG.info(this + " was interrupted: " + e); - LOG.trace("TRACE", e); - return; - } catch (Exception e) { - LOG.warn(this + " caught an exception", e); - } - } - } - - @Override - public String toString() { - return server.getId() + ": " + getClass().getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java b/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java deleted file mode 100644 index 8a97494..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/LeaderElection.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.ServerProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.RequestVoteRequestProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -class LeaderElection extends Daemon { - public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class); - - private ResultAndTerm logAndReturn(Result result, - List<RequestVoteReplyProto> responses, - List<Exception> exceptions, long newTerm) { - LOG.info(server.getId() + ": Election " + result + "; received " - + responses.size() + " response(s) " - + responses.stream().map(r -> ProtoUtils.toString(r)).collect(Collectors.toList()) - + " and " + exceptions.size() + " exception(s); " + server.getState()); - int i = 0; - for(Exception e : exceptions) { - LOG.info(" " + i++ + ": " + e); - LOG.trace("TRACE", e); - } - return new ResultAndTerm(result, newTerm); - } - - enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN} - - private static class ResultAndTerm { - final Result result; - final long term; - - ResultAndTerm(Result result, long term) { - this.result = result; - this.term = term; - } - } - - private final RaftServer server; - private ExecutorCompletionService<RequestVoteReplyProto> service; - private ExecutorService executor; - private volatile boolean running; - /** - * The Raft configuration should not change while the peer is in candidate - * state. If the configuration changes, another peer should be acting as a - * leader and this LeaderElection session should end. - */ - private final RaftConfiguration conf; - private final Collection<RaftPeer> others; - - LeaderElection(RaftServer server) { - this.server = server; - conf = server.getRaftConf(); - others = conf.getOtherPeers(server.getId()); - this.running = true; - } - - void stopRunning() { - this.running = false; - } - - private void initExecutor() { - Preconditions.checkState(!others.isEmpty()); - executor = Executors.newFixedThreadPool(others.size(), - new ThreadFactoryBuilder().setDaemon(true).build()); - service = new ExecutorCompletionService<>(executor); - } - - @Override - public void run() { - try { - askForVotes(); - } catch (InterruptedException e) { - // the leader election thread is interrupted. The peer may already step - // down to a follower. The leader election should skip. - LOG.info("The leader election thread of peer {} is interrupted. " + - "Currently role: {}.", server.getId(), server.getRole()); - } catch (IOException e) { - LOG.warn("Failed to persist votedFor/term. Exit the leader election.", e); - stopRunning(); - } - } - - /** - * After a peer changes its role to candidate, it invokes this method to - * send out requestVote rpc to all other peers. - */ - private void askForVotes() throws InterruptedException, IOException { - final ServerState state = server.getState(); - while (running && server.isCandidate()) { - // one round of requestVotes - final long electionTerm; - synchronized (server) { - electionTerm = state.initElection(); - server.getState().persistMetadata(); - } - LOG.info(state.getSelfId() + ": begin an election in Term " - + electionTerm); - - TermIndex lastEntry = ServerProtoUtils.toTermIndex( - state.getLog().getLastEntry()); - if (lastEntry == null) { - // lastEntry may need to be derived from snapshot - SnapshotInfo snapshot = state.getLatestSnapshot(); - if (snapshot != null) { - lastEntry = snapshot.getTermIndex(); - } - } - - final ResultAndTerm r; - if (others.isEmpty()) { - r = new ResultAndTerm(Result.PASSED, electionTerm); - } else { - try { - initExecutor(); - int submitted = submitRequests(electionTerm, lastEntry); - r = waitForResults(electionTerm, submitted); - } finally { - if (executor != null) { - executor.shutdown(); - } - } - } - - synchronized (server) { - if (electionTerm != state.getCurrentTerm() || !running || - !server.isCandidate()) { - return; // term already passed or no longer a candidate. - } - - switch (r.result) { - case PASSED: - server.changeToLeader(); - return; - case SHUTDOWN: - LOG.info("{} received shutdown response when requesting votes.", - server.getId()); - server.close(); - return; - case REJECTED: - case DISCOVERED_A_NEW_TERM: - final long term = r.term > server.getState().getCurrentTerm() ? - r.term : server.getState().getCurrentTerm(); - server.changeToFollower(term, true); - return; - case TIMEOUT: - // should start another election - } - } - } - } - - private int submitRequests(final long electionTerm, final TermIndex lastEntry) { - int submitted = 0; - for (final RaftPeer peer : others) { - final RequestVoteRequestProto r = server.createRequestVoteRequest( - peer.getId(), electionTerm, lastEntry); - service.submit( - () -> server.getServerRpc().sendRequestVote(r)); - submitted++; - } - return submitted; - } - - private ResultAndTerm waitForResults(final long electionTerm, - final int submitted) throws InterruptedException { - final Timestamp timeout = new Timestamp().addTimeMs(server.getRandomTimeoutMs()); - final List<RequestVoteReplyProto> responses = new ArrayList<>(); - final List<Exception> exceptions = new ArrayList<>(); - int waitForNum = submitted; - Collection<String> votedPeers = new ArrayList<>(); - while (waitForNum > 0 && running && server.isCandidate()) { - final long waitTime = -timeout.elapsedTimeMs(); - if (waitTime <= 0) { - return logAndReturn(Result.TIMEOUT, responses, exceptions, -1); - } - - try { - final Future<RequestVoteReplyProto> future = service.poll( - waitTime, TimeUnit.MILLISECONDS); - if (future == null) { - continue; // poll timeout, continue to return Result.TIMEOUT - } - - final RequestVoteReplyProto r = future.get(); - responses.add(r); - if (r.getShouldShutdown()) { - return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1); - } - if (r.getTerm() > electionTerm) { - return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses, - exceptions, r.getTerm()); - } - if (r.getServerReply().getSuccess()) { - votedPeers.add(r.getServerReply().getReplyId()); - if (conf.hasMajority(votedPeers, server.getId())) { - return logAndReturn(Result.PASSED, responses, exceptions, -1); - } - } - } catch(ExecutionException e) { - LOG.info("Got exception when requesting votes: " + e); - LOG.trace("TRACE", e); - exceptions.add(e); - } - waitForNum--; - } - // received all the responses - return logAndReturn(Result.REJECTED, responses, exceptions, -1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LeaderState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/LeaderState.java b/raft-server/src/main/java/org/apache/raft/server/LeaderState.java deleted file mode 100644 index 1576311..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/LeaderState.java +++ /dev/null @@ -1,581 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.shaded.proto.RaftProtos.LeaderNoOp; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.Timestamp; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.raft.server.LeaderState.StateUpdateEventType.*; -import static org.apache.raft.server.RaftServerConfigKeys.*; - -/** - * States for leader only. It contains three different types of processors: - * 1. RPC senders: each thread is appending log to a follower - * 2. EventProcessor: a single thread updating the raft server's state based on - * status of log appending response - * 3. PendingRequestHandler: a handler sending back responses to clients when - * corresponding log entries are committed - */ -public class LeaderState { - private static final Logger LOG = RaftServer.LOG; - - enum StateUpdateEventType { - STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS - } - - enum BootStrapProgress { - NOPROGRESS, PROGRESSING, CAUGHTUP - } - - static class StateUpdateEvent { - final StateUpdateEventType type; - final long newTerm; - - StateUpdateEvent(StateUpdateEventType type, long newTerm) { - this.type = type; - this.newTerm = newTerm; - } - } - - static final StateUpdateEvent UPDATE_COMMIT_EVENT = - new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); - static final StateUpdateEvent STAGING_PROGRESS_EVENT = - new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); - - private final RaftServer server; - private final RaftLog raftLog; - private final long currentTerm; - private volatile ConfigurationStagingState stagingState; - private List<List<FollowerInfo>> voterLists; - - /** - * The list of threads appending entries to followers. - * The list is protected by the RaftServer's lock. - */ - private final List<LogAppender> senders; - private final BlockingQueue<StateUpdateEvent> eventQ; - private final EventProcessor processor; - private final PendingRequests pendingRequests; - private volatile boolean running = true; - - private final int stagingCatchupGap; - private final int snapshotChunkMaxSize; - private final int syncInterval; - - LeaderState(RaftServer server, RaftProperties properties) { - this.server = server; - - stagingCatchupGap = properties.getInt( - RAFT_SERVER_STAGING_CATCHUP_GAP_KEY, - RAFT_SERVER_STAGING_CATCHUP_GAP_DEFAULT); - snapshotChunkMaxSize = properties.getInt( - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_KEY, - RAFT_SNAPSHOT_CHUNK_MAX_SIZE_DEFAULT); - syncInterval = properties.getInt( - RAFT_SERVER_RPC_SLEEP_TIME_MS_KEY, - RAFT_SERVER_RPC_SLEEP_TIME_MS_DEFAULT); - - final ServerState state = server.getState(); - this.raftLog = state.getLog(); - this.currentTerm = state.getCurrentTerm(); - eventQ = new ArrayBlockingQueue<>(4096); - processor = new EventProcessor(); - pendingRequests = new PendingRequests(server); - - final RaftConfiguration conf = server.getRaftConf(); - Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); - final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final long nextIndex = raftLog.getNextIndex(); - senders = new ArrayList<>(others.size()); - for (RaftPeer p : others) { - FollowerInfo f = new FollowerInfo(p, t, nextIndex, true); - senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f)); - } - voterLists = divideFollowers(conf); - } - - void start() { - // In the beginning of the new term, replicate an empty entry in order - // to finally commit entries in the previous term. - // Also this message can help identify the last committed index when - // the leader peer is just started. - final LogEntryProto placeHolder = LogEntryProto.newBuilder() - .setTerm(server.getState().getCurrentTerm()) - .setIndex(raftLog.getNextIndex()) - .setNoOp(LeaderNoOp.newBuilder()).build(); - raftLog.append(placeHolder); - - processor.start(); - startSenders(); - } - - private void startSenders() { - senders.forEach(Thread::start); - } - - void stop() { - this.running = false; - // do not interrupt event processor since it may be in the middle of logSync - for (LogAppender sender : senders) { - sender.stopSender(); - sender.interrupt(); - } - try { - pendingRequests.sendNotLeaderResponses(); - } catch (IOException e) { - LOG.warn("Caught exception in sendNotLeaderResponses", e); - } - } - - void notifySenders() { - senders.forEach(LogAppender::notifyAppend); - } - - boolean inStagingState() { - return stagingState != null; - } - - ConfigurationStagingState getStagingState() { - return stagingState; - } - - long getCurrentTerm() { - return currentTerm; - } - - int getSnapshotChunkMaxSize() { - return snapshotChunkMaxSize; - } - - int getSyncInterval() { - return syncInterval; - } - - /** - * Start bootstrapping new peers - */ - PendingRequest startSetConfiguration(SetConfigurationRequest request) { - Preconditions.checkState(running && !inStagingState()); - - RaftPeer[] peersInNewConf = request.getPeersInNewConf(); - Collection<RaftPeer> peersToBootStrap = RaftConfiguration - .computeNewPeers(peersInNewConf, server.getRaftConf()); - - // add the request to the pending queue - final PendingRequest pending = pendingRequests.addConfRequest(request); - - ConfigurationStagingState stagingState = new ConfigurationStagingState( - peersToBootStrap, new PeerConfiguration(Arrays.asList(peersInNewConf))); - Collection<RaftPeer> newPeers = stagingState.getNewPeers(); - // set the staging state - this.stagingState = stagingState; - - if (newPeers.isEmpty()) { - applyOldNewConf(); - } else { - // update the LeaderState's sender list - addSenders(newPeers); - } - return pending; - } - - PendingRequest addPendingRequest(long index, RaftClientRequest request, - TransactionContext entry) { - return pendingRequests.addPendingRequest(index, request, entry); - } - - private void applyOldNewConf() { - final ServerState state = server.getState(); - final RaftConfiguration current = server.getRaftConf(); - final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current, - state.getLog().getNextIndex()); - // apply the (old, new) configuration to log, and use it as the current conf - long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); - updateConfiguration(index, oldNewConf); - - this.stagingState = null; - notifySenders(); - } - - private void updateConfiguration(long logIndex, RaftConfiguration newConf) { - voterLists = divideFollowers(newConf); - server.getState().setRaftConf(logIndex, newConf); - } - - /** - * After receiving a setConfiguration request, the leader should update its - * RpcSender list. - */ - void addSenders(Collection<RaftPeer> newMembers) { - final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final long nextIndex = raftLog.getNextIndex(); - for (RaftPeer peer : newMembers) { - FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false); - LogAppender sender = server.getLogAppenderFactory() - .getLogAppender(server, this, f); - senders.add(sender); - sender.start(); - } - } - - /** - * Update the RpcSender list based on the current configuration - */ - private void updateSenders(RaftConfiguration conf) { - Preconditions.checkState(conf.isStable() && !inStagingState()); - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!conf.containsInConf(sender.getFollower().getPeer().getId())) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } - } - - void submitUpdateStateEvent(StateUpdateEvent event) { - try { - eventQ.put(event); - } catch (InterruptedException e) { - LOG.info("Interrupted when adding event {} into the queue", event); - } - } - - private void prepare() { - synchronized (server) { - if (running) { - final RaftConfiguration conf = server.getRaftConf(); - if (conf.isTransitional() && server.getState().isConfCommitted()) { - // the configuration is in transitional state, and has been committed - // so it is time to generate and replicate (new) conf. - replicateNewConf(); - } - } - } - } - - /** - * The processor thread takes the responsibility to update the raft server's - * state, such as changing to follower, or updating the committed index. - */ - private class EventProcessor extends Daemon { - @Override - public void run() { - // apply an empty message; check if necessary to replicate (new) conf - prepare(); - - while (running) { - try { - StateUpdateEvent event = eventQ.poll(server.getMaxTimeoutMs(), - TimeUnit.MILLISECONDS); - synchronized (server) { - if (running) { - handleEvent(event); - } - } - // the updated configuration does not need to be sync'ed here - } catch (InterruptedException e) { - if (!running) { - LOG.info("The LeaderState gets is stopped"); - } else { - LOG.warn("The leader election thread of peer {} is interrupted. " - + "Currently role: {}.", server.getId(), server.getRole()); - throw new RuntimeException(e); - } - } catch (IOException e) { - LOG.warn("Failed to persist new votedFor/term.", e); - // the failure should happen while changing the state to follower - // thus the in-memory state should have been updated - Preconditions.checkState(!running); - } - } - } - } - - private void handleEvent(StateUpdateEvent e) throws IOException { - if (e == null) { - if (inStagingState()) { - checkNewPeers(); - } - } else { - if (e.type == STEPDOWN) { - server.changeToFollower(e.newTerm, true); - } else if (e.type == UPDATECOMMIT) { - updateLastCommitted(); - } else if (e.type == STAGINGPROGRESS) { - checkNewPeers(); - } - } - } - - /** - * So far we use a simple implementation for catchup checking: - * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, - * the peer made no progress for that long. We should fail the whole - * setConfiguration request. - * 2. If the peer's matching index is just behind for a small gap, and the - * peer was updated recently (within max_timeout), declare the peer as - * caught-up. - * 3. Otherwise the peer is making progressing. Keep waiting. - */ - private BootStrapProgress checkProgress(FollowerInfo follower, - long committed) { - Preconditions.checkArgument(!follower.isAttendingVote()); - final Timestamp progressTime = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); - final Timestamp timeoutTime = new Timestamp().addTimeMs(-3*server.getMaxTimeoutMs()); - if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { - LOG.debug("{} detects a follower {} timeout for bootstrapping," + - " timeoutTime: {}", server.getId(), follower, timeoutTime); - return BootStrapProgress.NOPROGRESS; - } else if (follower.getMatchIndex() + stagingCatchupGap > committed - && follower.getLastRpcResponseTime().compareTo(progressTime) > 0) { - return BootStrapProgress.CAUGHTUP; - } else { - return BootStrapProgress.PROGRESSING; - } - } - - private Collection<BootStrapProgress> checkAllProgress(long committed) { - Preconditions.checkState(inStagingState()); - return senders.stream() - .filter(sender -> !sender.getFollower().isAttendingVote()) - .map(sender -> checkProgress(sender.getFollower(), committed)) - .collect(Collectors.toCollection(ArrayList::new)); - } - - private void checkNewPeers() { - if (!inStagingState()) { - // it is possible that the bootstrapping is done and we still have - // remaining STAGINGPROGRESS event to handle. - updateLastCommitted(); - } else { - final long committedIndex = server.getState().getLog() - .getLastCommittedIndex(); - Collection<BootStrapProgress> reports = checkAllProgress(committedIndex); - if (reports.contains(BootStrapProgress.NOPROGRESS)) { - LOG.debug("{} fails the setConfiguration request", server.getId()); - stagingState.fail(); - } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { - // all caught up! - applyOldNewConf(); - for (LogAppender sender : senders) { - sender.getFollower().startAttendVote(); - } - } - } - } - - boolean isBootStrappingPeer(String peerId) { - return inStagingState() && getStagingState().contains(peerId); - } - - private void updateLastCommitted() { - final String selfId = server.getId(); - final RaftConfiguration conf = server.getRaftConf(); - long majorityInNewConf = computeLastCommitted(voterLists.get(0), - conf.containsInConf(selfId)); - final long oldLastCommitted = raftLog.getLastCommittedIndex(); - final LogEntryProto[] entriesToCommit; - if (!conf.isTransitional()) { - // copy the entries that may get committed out of the raftlog, to prevent - // the possible race that the log gets purged after the statemachine does - // a snapshot - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majorityInNewConf, oldLastCommitted) + 1); - server.getState().updateStatemachine(majorityInNewConf, currentTerm); - } else { // configuration is in transitional state - long majorityInOldConf = computeLastCommitted(voterLists.get(1), - conf.containsInOldConf(selfId)); - final long majority = Math.min(majorityInNewConf, majorityInOldConf); - entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, - Math.max(majority, oldLastCommitted) + 1); - server.getState().updateStatemachine(majority, currentTerm); - } - checkAndUpdateConfiguration(entriesToCommit); - } - - private boolean committedConf(LogEntryProto[] entries) { - final long currentCommitted = raftLog.getLastCommittedIndex(); - for (LogEntryProto entry : entries) { - if (entry.getIndex() <= currentCommitted && - ProtoUtils.isConfigurationLogEntry(entry)) { - return true; - } - } - return false; - } - - private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) { - final RaftConfiguration conf = server.getRaftConf(); - if (committedConf(entriesToCheck)) { - if (conf.isTransitional()) { - replicateNewConf(); - } else { // the (new) log entry has been committed - LOG.debug("{} sends success to setConfiguration request", server.getId()); - pendingRequests.replySetConfiguration(); - // if the leader is not included in the current configuration, step down - if (!conf.containsInConf(server.getId())) { - LOG.info("{} is not included in the new configuration {}. Step down.", - server.getId(), conf); - try { - // leave some time for all RPC senders to send out new conf entry - Thread.sleep(server.getMinTimeoutMs()); - } catch (InterruptedException ignored) { - } - // the pending request handler will send NotLeaderException for - // pending client requests when it stops - server.close(); - } - } - } - } - - /** - * when the (old, new) log entry has been committed, should replicate (new): - * 1) append (new) to log - * 2) update conf to (new) - * 3) update RpcSenders list - * 4) start replicating the log entry - */ - private void replicateNewConf() { - final RaftConfiguration conf = server.getRaftConf(); - final RaftConfiguration newConf = RaftConfiguration.newBuilder() - .setConf(conf) - .setLogEntryIndex(raftLog.getNextIndex()) - .build(); - // stop the LogAppender if the corresponding follower is no longer in the conf - updateSenders(newConf); - long index = raftLog.append(server.getState().getCurrentTerm(), newConf); - updateConfiguration(index, newConf); - notifySenders(); - } - - private long computeLastCommitted(List<FollowerInfo> followers, - boolean includeSelf) { - final int length = includeSelf ? followers.size() + 1 : followers.size(); - final long[] indices = new long[length]; - for (int i = 0; i < followers.size(); i++) { - indices[i] = followers.get(i).getMatchIndex(); - } - if (includeSelf) { - // note that we also need to wait for the local disk I/O - indices[length - 1] = raftLog.getLatestFlushedIndex(); - } - - Arrays.sort(indices); - return indices[(indices.length - 1) / 2]; - } - - private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) { - List<List<FollowerInfo>> lists = new ArrayList<>(2); - List<FollowerInfo> listForNew = senders.stream() - .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId())) - .map(LogAppender::getFollower) - .collect(Collectors.toList()); - lists.add(listForNew); - if (conf.isTransitional()) { - List<FollowerInfo> listForOld = senders.stream() - .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId())) - .map(LogAppender::getFollower) - .collect(Collectors.toList()); - lists.add(listForOld); - } - return lists; - } - - PendingRequest returnNoConfChange(SetConfigurationRequest r) { - PendingRequest pending = new PendingRequest(r); - pending.setSuccessReply(null); - return pending; - } - - void replyPendingRequest(long logIndex, CompletableFuture<Message> message) { - pendingRequests.replyPendingRequest(logIndex, message); - } - - TransactionContext getTransactionContext(long index) { - return pendingRequests.getTransactionContext(index); - } - - private class ConfigurationStagingState { - private final Map<String, RaftPeer> newPeers; - private final PeerConfiguration newConf; - - ConfigurationStagingState(Collection<RaftPeer> newPeers, - PeerConfiguration newConf) { - Map<String, RaftPeer> map = new HashMap<>(); - for (RaftPeer peer : newPeers) { - map.put(peer.getId(), peer); - } - this.newPeers = Collections.unmodifiableMap(map); - this.newConf = newConf; - } - - RaftConfiguration generateOldNewConf(RaftConfiguration current, - long logIndex) { - return RaftConfiguration.newBuilder() - .setConf(newConf) - .setOldConf(current) - .setLogEntryIndex(logIndex) - .build(); - } - - Collection<RaftPeer> getNewPeers() { - return newPeers.values(); - } - - boolean contains(String peerId) { - return newPeers.containsKey(peerId); - } - - void fail() { - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!sender.getFollower().isAttendingVote()) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } - LeaderState.this.stagingState = null; - // send back failure response to client's request - pendingRequests.failSetConfiguration( - new ReconfigurationTimeoutException("Fail to set configuration " - + newConf + ". Timeout when bootstrapping new peers.")); - } - } -}
