Rename RaftServer to RaftServerImpl (will add an interface.)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/22f3ee0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/22f3ee0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/22f3ee0b Branch: refs/heads/master Commit: 22f3ee0b46a5485deafa84939eb5290938b5552b Parents: c36810e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Mon Jan 2 20:10:28 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Mon Jan 2 20:10:28 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/raft/TestBatchAppend.java | 4 +- .../org/apache/raft/TestRestartRaftPeer.java | 4 +- .../TestRaftStateMachineException.java | 4 +- .../org/apache/raft/grpc/RaftGRpcService.java | 4 +- .../raft/grpc/server/GRpcLogAppender.java | 6 +- .../server/PipelinedLogAppenderFactory.java | 6 +- .../raft/grpc/MiniRaftClusterWithGRpc.java | 14 +- .../org/apache/raft/grpc/TestRaftStream.java | 12 +- .../org/apache/raft/grpc/TestRaftWithGrpc.java | 4 +- .../raft/hadooprpc/server/HadoopRpcService.java | 4 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 12 +- .../raft/hadooprpc/TestRaftWithHadoopRpc.java | 4 +- .../raft/netty/server/NettyRpcService.java | 4 +- .../raft/netty/MiniRaftClusterWithNetty.java | 14 +- .../apache/raft/netty/TestRaftWithNetty.java | 4 +- .../apache/raft/server/impl/FollowerState.java | 6 +- .../apache/raft/server/impl/LeaderElection.java | 4 +- .../apache/raft/server/impl/LeaderState.java | 6 +- .../apache/raft/server/impl/LogAppender.java | 6 +- .../raft/server/impl/LogAppenderFactory.java | 8 +- .../raft/server/impl/PendingRequests.java | 6 +- .../org/apache/raft/server/impl/RaftServer.java | 749 ------------------- .../apache/raft/server/impl/RaftServerImpl.java | 749 +++++++++++++++++++ .../raft/server/impl/RequestDispatcher.java | 4 +- .../apache/raft/server/impl/ServerState.java | 10 +- .../raft/server/impl/StateMachineUpdater.java | 4 +- .../raft/server/storage/RaftLogWorker.java | 8 +- .../raft/server/storage/SegmentedRaftLog.java | 6 +- .../java/org/apache/raft/MiniRaftCluster.java | 60 +- .../java/org/apache/raft/RaftBasicTests.java | 8 +- .../raft/RaftNotLeaderExceptionBaseTest.java | 4 +- .../test/java/org/apache/raft/RaftTestUtil.java | 22 +- .../impl/BlockRequestHandlingInjection.java | 6 +- .../impl/RaftReconfigurationBaseTest.java | 4 +- .../raft/server/impl/RaftServerTestUtil.java | 2 +- .../MiniRaftClusterWithSimulatedRpc.java | 10 +- .../server/simulation/SimulatedServerRpc.java | 6 +- .../simulation/TestRaftWithSimulatedRpc.java | 4 +- .../raft/statemachine/RaftSnapshotBaseTest.java | 4 +- .../raft/statemachine/TestStateMachine.java | 8 +- 40 files changed, 902 insertions(+), 902 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java index c730245..1fd8ba0 100644 --- a/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java +++ b/raft-examples/src/test/java/org/apache/raft/TestBatchAppend.java @@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.statemachine.SimpleStateMachine4Testing; import org.apache.raft.statemachine.StateMachine; @@ -55,7 +55,7 @@ import static org.apache.raft.server.RaftServerConfigKeys.*; public class TestBatchAppend { static Logger LOG = LoggerFactory.getLogger(TestBatchAppend.class); static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java index fadd7a8..e04d141 100644 --- a/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java +++ b/raft-examples/src/test/java/org/apache/raft/TestRestartRaftPeer.java @@ -22,7 +22,7 @@ import org.apache.raft.RaftTestUtil.SimpleMessage; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; import org.apache.raft.examples.RaftExamplesTestUtil; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; @@ -49,7 +49,7 @@ import java.util.Collection; public class TestRestartRaftPeer { static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class); static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java index 2fa6e53..0832579 100644 --- a/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java +++ b/raft-examples/src/test/java/org/apache/raft/statemachine/TestRaftStateMachineException.java @@ -24,7 +24,7 @@ import org.apache.raft.client.RaftClient; import org.apache.raft.examples.RaftExamplesTestUtil; import org.apache.raft.protocol.Message; import org.apache.raft.protocol.StateMachineException; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.util.RaftUtils; @@ -42,7 +42,7 @@ import static org.junit.Assert.fail; @RunWith(Parameterized.class) public class TestRaftStateMachineException { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java index c9a0daf..d465724 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java @@ -23,7 +23,7 @@ import org.apache.raft.grpc.client.RaftClientProtocolService; import org.apache.raft.grpc.server.RaftServerProtocolClient; import org.apache.raft.grpc.server.RaftServerProtocolService; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.Server; @@ -54,7 +54,7 @@ public class RaftGRpcService implements RaftServerRpc { Collections.synchronizedMap(new HashMap<>()); private final String selfId; - public RaftGRpcService(RaftServer raftServer, RaftProperties properties) { + public RaftGRpcService(RaftServerImpl raftServer, RaftProperties properties) { int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, RAFT_GRPC_SERVER_PORT_DEFAULT); int maxMessageSize = properties.getInt( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java index b171bed..a8a39bb 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java @@ -23,7 +23,7 @@ import org.apache.raft.grpc.RaftGrpcConfigKeys; import org.apache.raft.server.impl.FollowerInfo; import org.apache.raft.server.impl.LeaderState; import org.apache.raft.server.impl.LogAppender; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.shaded.io.grpc.Status; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto; @@ -56,8 +56,8 @@ public class GRpcLogAppender extends LogAppender { private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver; - public GRpcLogAppender(RaftServer server, LeaderState leaderState, - FollowerInfo f) { + public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState, + FollowerInfo f) { super(server, leaderState, f); RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java index 5f01980..cc2e513 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java @@ -21,12 +21,12 @@ import org.apache.raft.server.impl.FollowerInfo; import org.apache.raft.server.impl.LeaderState; import org.apache.raft.server.impl.LogAppender; import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; public class PipelinedLogAppenderFactory implements LogAppenderFactory { @Override - public LogAppender getLogAppender(RaftServer server, LeaderState state, - FollowerInfo f) { + public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { return new GRpcLogAppender(server, state, f); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java index 420ee88..359dabd 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java @@ -28,7 +28,7 @@ import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.impl.BlockRequestHandlingInjection; import org.apache.raft.server.impl.DelayLocalExecutionInjection; import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.util.NetUtils; import java.io.IOException; @@ -71,10 +71,10 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } private static Map<RaftPeer, RaftGRpcService> initRpcServices( - Collection<RaftServer> servers, RaftProperties prop) throws IOException { + Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException { final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>(); - for (RaftServer s : servers) { + for (RaftServerImpl s : servers) { final RaftGRpcService rpc = new RaftGRpcService(s, prop); peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); } @@ -88,11 +88,11 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { @Override protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServer> newServers, boolean startService) + Collection<RaftServerImpl> newServers, boolean startService) throws IOException { final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, properties); for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) { - RaftServer server = servers.get(entry.getKey().getId()); + RaftServerImpl server = servers.get(entry.getKey().getId()); server.setServerRpc(entry.getValue()); if (!startService) { BlockRequestHandlingInjection.getInstance().blockReplier(server.getId()); @@ -104,8 +104,8 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - protected RaftServer setPeerRpc(RaftPeer peer) throws IOException { - RaftServer server = servers.get(peer.getId()); + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { + RaftServerImpl server = servers.get(peer.getId()); int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort(); int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java index 0c85854..82a4e13 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java @@ -24,7 +24,7 @@ import org.apache.raft.grpc.client.AppendStreamer; import org.apache.raft.grpc.client.RaftOutputStream; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.util.RaftUtils; @@ -95,7 +95,7 @@ public class TestRaftStream { cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); cluster.start(); - RaftServer leader = waitForLeader(cluster); + RaftServerImpl leader = waitForLeader(cluster); int count = 1; try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1", @@ -137,7 +137,7 @@ public class TestRaftStream { cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); cluster.start(); - RaftServer leader = waitForLeader(cluster); + RaftServerImpl leader = waitForLeader(cluster); RaftOutputStream out = new RaftOutputStream(prop, "writer", cluster.getPeers(), leader.getId()); @@ -215,7 +215,7 @@ public class TestRaftStream { cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); cluster.start(); - RaftServer leader = waitForLeader(cluster); + RaftServerImpl leader = waitForLeader(cluster); RaftOutputStream out = new RaftOutputStream(prop, "writer", cluster.getPeers(), leader.getId()); @@ -273,7 +273,7 @@ public class TestRaftStream { prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); cluster.start(); - final RaftServer leader = waitForLeader(cluster); + final RaftServerImpl leader = waitForLeader(cluster); final AtomicBoolean running = new AtomicBoolean(true); final AtomicBoolean success = new AtomicBoolean(false); @@ -301,7 +301,7 @@ public class TestRaftStream { // force change the leader RaftTestUtil.waitAndKillLeader(cluster, true); - final RaftServer newLeader = waitForLeader(cluster); + final RaftServerImpl newLeader = waitForLeader(cluster); Assert.assertNotEquals(leader.getId(), newLeader.getId()); Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java index b2d104b..c6667b4 100644 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java +++ b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java @@ -22,7 +22,7 @@ import org.apache.raft.RaftBasicTests; import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; import org.apache.raft.server.impl.BlockRequestHandlingInjection; import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.util.RaftUtils; import org.junit.Assert; import org.junit.BeforeClass; @@ -34,7 +34,7 @@ import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPEND public class TestRaftWithGrpc extends RaftBasicTests { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); } private final MiniRaftClusterWithGRpc cluster; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java index ad4beec..eb92372 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java @@ -26,7 +26,7 @@ import org.apache.raft.hadooprpc.client.RaftClientProtocolPB; import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.*; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.com.google.protobuf.BlockingService; @@ -56,7 +56,7 @@ public class HadoopRpcService implements RaftServerRpc { private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies; - public HadoopRpcService(RaftServer server, final Configuration conf) + public HadoopRpcService(RaftServerImpl server, final Configuration conf) throws IOException { this.proxies = new PeerProxyMap<>( p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java index 7f7ef49..87b4d1c 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -27,7 +27,7 @@ import org.apache.raft.hadooprpc.client.HadoopClientRequestSender; import org.apache.raft.hadooprpc.server.HadoopRpcService; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.impl.DelayLocalExecutionInjection; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.RaftServerConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +70,10 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { } private static Map<RaftPeer, HadoopRpcService> initRpcServices( - Collection<RaftServer> servers, Configuration hadoopConf) throws IOException { + Collection<RaftServerImpl> servers, Configuration hadoopConf) throws IOException { final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>(); - for(RaftServer s : servers) { + for(RaftServerImpl s : servers) { final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf); peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); } @@ -81,11 +81,11 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { } @Override - protected RaftServer setPeerRpc(RaftPeer peer) throws IOException { + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { Configuration hconf = new Configuration(hadoopConf); hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress()); - RaftServer server = servers.get(peer.getId()); + RaftServerImpl server = servers.get(peer.getId()); final HadoopRpcService rpc = new HadoopRpcService(server, hconf); Preconditions.checkState( rpc.getInetSocketAddress().toString().contains(peer.getAddress()), @@ -97,7 +97,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { @Override public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServer> newServers, boolean startService) + Collection<RaftServerImpl> newServers, boolean startService) throws IOException { return addNewPeers(initRpcServices(newServers, hadoopConf), newServers, startService); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java index 3971274..4848196 100644 --- a/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java +++ b/raft-hadoop/src/test/java/org/apache/raft/hadooprpc/TestRaftWithHadoopRpc.java @@ -22,7 +22,7 @@ import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.client.RaftClient; import org.apache.raft.server.impl.BlockRequestHandlingInjection; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.util.RaftUtils; import org.junit.Test; @@ -33,7 +33,7 @@ import static org.apache.raft.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServerR public class TestRaftWithHadoopRpc extends RaftBasicTests { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java index c0d751d..b5b8550 100644 --- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java +++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java @@ -29,7 +29,7 @@ import org.apache.raft.client.impl.ClientProtoUtils; import org.apache.raft.netty.NettyRpcProxy; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; @@ -75,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc { } /** Constructs a netty server with the given port. */ - public NettyRpcService(int port, RaftServer server) { + public NettyRpcService(int port, RaftServerImpl server) { this.raftService = new RequestDispatcher(server); this.id = server.getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java index 4958e9a..12cdf13 100644 --- a/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java +++ b/raft-netty/src/test/java/org/apache/raft/netty/MiniRaftClusterWithNetty.java @@ -26,7 +26,7 @@ import org.apache.raft.netty.server.NettyRpcService; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.impl.DelayLocalExecutionInjection; import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.util.NetUtils; import java.io.IOException; @@ -69,17 +69,17 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { } private static NettyRpcService newNettyRpcService( - RaftServer s, RaftConfiguration conf) { + RaftServerImpl s, RaftConfiguration conf) { final String address = getAddress(s.getId(), conf); final int port = NetUtils.newInetSocketAddress(address).getPort(); return new NettyRpcService(port, s); } private static Map<RaftPeer, NettyRpcService> initRpcServices( - Collection<RaftServer> servers, RaftConfiguration conf) { + Collection<RaftServerImpl> servers, RaftConfiguration conf) { final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>(); - for (RaftServer s : servers) { + for (RaftServerImpl s : servers) { final NettyRpcService rpc = newNettyRpcService(s, conf); peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); } @@ -88,8 +88,8 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { } @Override - protected RaftServer setPeerRpc(RaftPeer peer) throws IOException { - final RaftServer s = servers.get(peer.getId()); + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { + final RaftServerImpl s = servers.get(peer.getId()); final NettyRpcService rpc = newNettyRpcService(s, conf); s.setServerRpc(rpc); return s; @@ -97,7 +97,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { @Override protected Collection<RaftPeer> addNewPeers( - Collection<RaftPeer> newPeers, Collection<RaftServer> newServers, + Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, boolean startService) throws IOException { return addNewPeers(initRpcServices(newServers, conf), newServers, startService); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java index fb75b7b..cba991d 100644 --- a/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java +++ b/raft-netty/src/test/java/org/apache/raft/netty/TestRaftWithNetty.java @@ -21,7 +21,7 @@ import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.client.RaftClient; import org.apache.raft.server.impl.BlockRequestHandlingInjection; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.util.RaftUtils; import org.junit.Test; @@ -29,7 +29,7 @@ import java.io.IOException; public class TestRaftWithNetty extends RaftBasicTests { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java index 61b3c92..20f2d8f 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/FollowerState.java @@ -25,15 +25,15 @@ import org.slf4j.Logger; * Used when the peer is a follower. Used to track the election timeout. */ class FollowerState extends Daemon { - static final Logger LOG = RaftServer.LOG; + static final Logger LOG = RaftServerImpl.LOG; - private final RaftServer server; + private final RaftServerImpl server; private volatile Timestamp lastRpcTime = new Timestamp(); private volatile boolean monitorRunning = true; private volatile boolean inLogSync = false; - FollowerState(RaftServer server) { + FollowerState(RaftServerImpl server) { this.server = server; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java index 10e901f..8552029 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderElection.java @@ -67,7 +67,7 @@ class LeaderElection extends Daemon { } } - private final RaftServer server; + private final RaftServerImpl server; private ExecutorCompletionService<RequestVoteReplyProto> service; private ExecutorService executor; private volatile boolean running; @@ -79,7 +79,7 @@ class LeaderElection extends Daemon { private final RaftConfiguration conf; private final Collection<RaftPeer> others; - LeaderElection(RaftServer server) { + LeaderElection(RaftServerImpl server) { this.server = server; conf = server.getRaftConf(); others = conf.getOtherPeers(server.getId()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java index 79d0d9e..39dc400 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LeaderState.java @@ -49,7 +49,7 @@ import static org.apache.raft.server.impl.LeaderState.StateUpdateEventType.*; * corresponding log entries are committed */ public class LeaderState { - private static final Logger LOG = RaftServer.LOG; + private static final Logger LOG = RaftServerImpl.LOG; enum StateUpdateEventType { STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS @@ -74,7 +74,7 @@ public class LeaderState { static final StateUpdateEvent STAGING_PROGRESS_EVENT = new StateUpdateEvent(StateUpdateEventType.STAGINGPROGRESS, -1); - private final RaftServer server; + private final RaftServerImpl server; private final RaftLog raftLog; private final long currentTerm; private volatile ConfigurationStagingState stagingState; @@ -94,7 +94,7 @@ public class LeaderState { private final int snapshotChunkMaxSize; private final int syncInterval; - LeaderState(RaftServer server, RaftProperties properties) { + LeaderState(RaftServerImpl server, RaftProperties properties) { this.server = server; stagingCatchupGap = properties.getInt( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java index 143da28..3b18f13 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppender.java @@ -46,9 +46,9 @@ import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; * A daemon thread appending log entries to a follower peer. */ public class LogAppender extends Daemon { - public static final Logger LOG = RaftServer.LOG; + public static final Logger LOG = RaftServerImpl.LOG; - protected final RaftServer server; + protected final RaftServerImpl server; private final LeaderState leaderState; protected final RaftLog raftLog; protected final FollowerInfo follower; @@ -59,7 +59,7 @@ public class LogAppender extends Daemon { private volatile boolean sending = true; - public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo f) { + public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) { this.follower = f; this.server = server; this.leaderState = leaderState; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java index b5ed775..d77faff 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/LogAppenderFactory.java @@ -18,13 +18,13 @@ package org.apache.raft.server.impl; public interface LogAppenderFactory { - LogAppender getLogAppender(RaftServer server, LeaderState state, - FollowerInfo f); + LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f); class SynchronousLogAppenderFactory implements LogAppenderFactory { @Override - public LogAppender getLogAppender(RaftServer server, LeaderState state, - FollowerInfo f) { + public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { return new LogAppender(server, state, f); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java index ad6ecef..32f127e 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/PendingRequests.java @@ -30,14 +30,14 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; class PendingRequests { - private static final Logger LOG = RaftServer.LOG; + private static final Logger LOG = RaftServerImpl.LOG; private PendingRequest pendingSetConf; - private final RaftServer server; + private final RaftServerImpl server; private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>(); private PendingRequest last = null; - PendingRequests(RaftServer server) { + PendingRequests(RaftServerImpl server) { this.server = server; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java deleted file mode 100644 index c1bf4a9..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServer.java +++ /dev/null @@ -1,749 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.CodeInjectionForTesting; -import org.apache.raft.util.LifeCycle; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.OptionalLong; -import java.util.concurrent.CompletableFuture; - -import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; -import static org.apache.raft.util.LifeCycle.State.*; - -public class RaftServer implements RaftServerProtocol, Closeable { - public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class); - - private static final String CLASS_NAME = RaftServer.class.getSimpleName(); - static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; - static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; - static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; - - - private final int minTimeoutMs; - private final int maxTimeoutMs; - - private final LifeCycle lifeCycle; - private final ServerState state; - private final StateMachine stateMachine; - private final RaftProperties properties; - private volatile Role role; - - /** used when the peer is follower, to monitor election timeout */ - private volatile FollowerState heartbeatMonitor; - - /** used when the peer is candidate, to request votes from other peers */ - private volatile LeaderElection electionDaemon; - - /** used when the peer is leader */ - private volatile LeaderState leaderState; - - private RaftServerRpc serverRpc; - - private final LogAppenderFactory appenderFactory; - - public RaftServer(String id, RaftConfiguration raftConf, - RaftProperties properties, StateMachine stateMachine) throws IOException { - this.lifeCycle = new LifeCycle(id); - minTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); - maxTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); - Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, - "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); - this.properties = properties; - this.stateMachine = stateMachine; - this.state = new ServerState(id, raftConf, properties, this, stateMachine); - appenderFactory = initAppenderFactory(); - } - - public int getMinTimeoutMs() { - return minTimeoutMs; - } - - public int getMaxTimeoutMs() { - return maxTimeoutMs; - } - - public int getRandomTimeoutMs() { - return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); - } - - public StateMachine getStateMachine() { - return this.stateMachine; - } - - public LogAppenderFactory getLogAppenderFactory() { - return appenderFactory; - } - - private LogAppenderFactory initAppenderFactory() { - Class<? extends LogAppenderFactory> factoryClass = properties.getClass( - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, - LogAppenderFactory.class); - return RaftUtils.newInstance(factoryClass); - } - - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration conf) { - this.state.setInitialConf(conf); - } - - public void setServerRpc(RaftServerRpc serverRpc) { - this.serverRpc = serverRpc; - // add peers into rpc service - RaftConfiguration conf = getRaftConf(); - if (conf != null) { - addPeersToRPC(conf.getPeers()); - } - } - - public RaftServerRpc getServerRpc() { - return serverRpc; - } - - public void start() { - lifeCycle.transition(STARTING); - state.start(); - RaftConfiguration conf = getRaftConf(); - if (conf != null && conf.contains(getId())) { - LOG.debug("{} starts as a follower", getId()); - startAsFollower(); - } else { - LOG.debug("{} starts with initializing state", getId()); - startInitializing(); - } - } - - /** - * The peer belongs to the current configuration, should start as a follower - */ - private void startAsFollower() { - role = Role.FOLLOWER; - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - - serverRpc.start(); - lifeCycle.transition(RUNNING); - } - - /** - * The peer does not have any configuration (maybe it will later be included - * in some configuration). Start still as a follower but will not vote or - * start election. - */ - private void startInitializing() { - role = Role.FOLLOWER; - // do not start heartbeatMonitoring - serverRpc.start(); - } - - public ServerState getState() { - return this.state; - } - - public String getId() { - return getState().getSelfId(); - } - - public RaftConfiguration getRaftConf() { - return getState().getRaftConf(); - } - - @Override - public void close() { - lifeCycle.checkStateAndClose(() -> { - try { - shutdownHeartbeatMonitor(); - shutdownElectionDaemon(); - shutdownLeaderState(); - - serverRpc.shutdown(); - state.close(); - } catch (Exception ignored) { - LOG.warn("Failed to kill " + state.getSelfId(), ignored); - } - }); - } - - public boolean isAlive() { - return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); - } - - public boolean isFollower() { - return role == Role.FOLLOWER; - } - - public boolean isCandidate() { - return role == Role.CANDIDATE; - } - - public boolean isLeader() { - return role == Role.LEADER; - } - - Role getRole() { - return role; - } - - /** - * Change the server state to Follower if necessary - * @param newTerm The new term. - * @param sync We will call {@link ServerState#persistMetadata()} if this is - * set to true and term/votedFor get updated. - * @return if the term/votedFor should be updated to the new term - * @throws IOException if term/votedFor persistence failed. - */ - synchronized boolean changeToFollower(long newTerm, boolean sync) - throws IOException { - final Role old = role; - role = Role.FOLLOWER; - - boolean metadataUpdated = false; - if (newTerm > state.getCurrentTerm()) { - state.setCurrentTerm(newTerm); - state.resetLeaderAndVotedFor(); - metadataUpdated = true; - } - - if (old == Role.LEADER) { - assert leaderState != null; - shutdownLeaderState(); - } else if (old == Role.CANDIDATE) { - shutdownElectionDaemon(); - } - - if (old != Role.FOLLOWER) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - - if (metadataUpdated && sync) { - state.persistMetadata(); - } - return metadataUpdated; - } - - private synchronized void shutdownLeaderState() { - final LeaderState leader = leaderState; - if (leader != null) { - leader.stop(); - } - leaderState = null; - // TODO: make sure that StateMachineUpdater has applied all transactions that have context - } - - private void shutdownElectionDaemon() { - final LeaderElection election = electionDaemon; - if (election != null) { - election.stopRunning(); - // no need to interrupt the election thread - } - electionDaemon = null; - } - - synchronized void changeToLeader() { - Preconditions.checkState(isCandidate()); - shutdownElectionDaemon(); - role = Role.LEADER; - state.becomeLeader(); - // start sending AppendEntries RPC to followers - leaderState = new LeaderState(this, properties); - leaderState.start(); - } - - private void shutdownHeartbeatMonitor() { - final FollowerState hm = heartbeatMonitor; - if (hm != null) { - hm.stopRunning(); - hm.interrupt(); - } - heartbeatMonitor = null; - } - - synchronized void changeToCandidate() { - Preconditions.checkState(isFollower()); - shutdownHeartbeatMonitor(); - role = Role.CANDIDATE; - // start election - electionDaemon = new LeaderElection(this); - electionDaemon.start(); - } - - @Override - public String toString() { - return role + " " + state + " " + lifeCycle.getCurrentState(); - } - - /** - * @return null if the server is in leader state. - */ - CompletableFuture<RaftClientReply> checkLeaderState( - RaftClientRequest request) { - if (!isLeader()) { - NotLeaderException exception = generateNotLeaderException(); - CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.complete(new RaftClientReply(request, exception)); - return future; - } - return null; - } - - NotLeaderException generateNotLeaderException() { - if (lifeCycle.getCurrentState() != RUNNING) { - return new NotLeaderException(getId(), null, null); - } - String leaderId = state.getLeaderId(); - if (leaderId == null || leaderId.equals(state.getSelfId())) { - // No idea about who is the current leader. Or the peer is the current - // leader, but it is about to step down - RaftPeer suggestedLeader = state.getRaftConf() - .getRandomPeer(state.getSelfId()); - leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); - } - RaftConfiguration conf = getRaftConf(); - Collection<RaftPeer> peers = conf.getPeers(); - return new NotLeaderException(getId(), conf.getPeer(leaderId), - peers.toArray(new RaftPeer[peers.size()])); - } - - /** - * Handle a normal update request from client. - */ - public CompletableFuture<RaftClientReply> appendTransaction( - RaftClientRequest request, TransactionContext entry) - throws RaftException { - LOG.debug("{}: receive client request({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply; - - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - // append the message to its local log - final long entryIndex; - try { - entryIndex = state.applyLog(entry); - } catch (IOException e) { - throw new RaftException(e); - } - - // put the request into the pending queue - pending = leaderState.addPendingRequest(entryIndex, request, entry); - leaderState.notifySenders(); - } - return pending.getFuture(); - } - - /** - * Handle a raft configuration change request from client. - */ - public CompletableFuture<RaftClientReply> setConfiguration( - SetConfigurationRequest request) throws IOException { - LOG.debug("{}: receive setConfiguration({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftConfiguration current = getRaftConf(); - // make sure there is no other raft reconfiguration in progress - if (!current.isStable() || leaderState.inStagingState() || - !state.isCurrentConfCommitted()) { - throw new ReconfigurationInProgressException( - "Reconfiguration is already in progress: " + current); - } - - // return true if the new configuration is the same with the current one - if (current.hasNoChange(peersInNewConf)) { - pending = leaderState.returnNoConfChange(request); - return pending.getFuture(); - } - - // add new peers into the rpc service - addPeersToRPC(Arrays.asList(peersInNewConf)); - // add staging state into the leaderState - pending = leaderState.startSetConfiguration(request); - } - return pending.getFuture(); - } - - private boolean shouldWithholdVotes() { - return isLeader() || (isFollower() && state.hasLeader() - && heartbeatMonitor.shouldWithholdVotes()); - } - - /** - * check if the remote peer is not included in the current conf - * and should shutdown. should shutdown if all the following stands: - * 1. this is a leader - * 2. current conf is stable and has been committed - * 3. candidate id is not included in conf - * 4. candidate's last entry's index < conf's index - */ - private boolean shouldSendShutdown(String candidateId, - TermIndex candidateLastEntry) { - return isLeader() - && getRaftConf().isStable() - && getState().isConfCommitted() - && !getRaftConf().containsInConf(candidateId) - && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() - && !leaderState.isBootStrappingPeer(candidateId); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) - throws IOException { - final String candidateId = r.getServerRequest().getRequestorId(); - return requestVote(candidateId, r.getCandidateTerm(), - ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); - } - - private RequestVoteReplyProto requestVote(String candidateId, - long candidateTerm, TermIndex candidateLastEntry) throws IOException { - CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), - candidateId, candidateTerm, candidateLastEntry); - LOG.debug("{}: receive requestVote({}, {}, {})", - getId(), candidateId, candidateTerm, candidateLastEntry); - lifeCycle.assertCurrentState(RUNNING); - - boolean voteGranted = false; - boolean shouldShutdown = false; - final RequestVoteReplyProto reply; - synchronized (this) { - if (shouldWithholdVotes()) { - LOG.info("{} Withhold vote from server {} with term {}. " + - "This server:{}, last rpc time from leader {} is {}", getId(), - candidateId, candidateTerm, this, this.getState().getLeaderId(), - (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); - } else if (state.recognizeCandidate(candidateId, candidateTerm)) { - boolean termUpdated = changeToFollower(candidateTerm, false); - // see Section 5.4.1 Election restriction - if (state.isLogUpToDate(candidateLastEntry)) { - heartbeatMonitor.updateLastRpcTime(false); - state.grantVote(candidateId); - voteGranted = true; - } - if (termUpdated || voteGranted) { - state.persistMetadata(); // sync metafile - } - } - if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { - shouldShutdown = true; - } - reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), - voteGranted, state.getCurrentTerm(), shouldShutdown); - if (LOG.isDebugEnabled()) { - LOG.debug("{} replies to vote request: {}. Peer's state: {}", - getId(), ProtoUtils.toString(reply), state); - } - } - return reply; - } - - private void validateEntries(long expectedTerm, TermIndex previous, - LogEntryProto... entries) { - if (entries != null && entries.length > 0) { - final long index0 = entries[0].getIndex(); - - if (previous == null || previous.getTerm() == 0) { - Preconditions.checkArgument(index0 == 0, - "Unexpected Index: previous is null but entries[%s].getIndex()=%s", - 0, index0); - } else { - Preconditions.checkArgument(previous.getIndex() == index0 - 1, - "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", - previous, 0, index0); - } - - for (int i = 0; i < entries.length; i++) { - final long t = entries[i].getTerm(); - Preconditions.checkArgument(expectedTerm >= t, - "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", - i, t, expectedTerm); - - final long indexi = entries[i].getIndex(); - Preconditions.checkArgument(indexi == index0 + i, - "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", - i, indexi, index0); - } - } - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) - throws IOException { - // TODO avoid converting list to array - final LogEntryProto[] entries = r.getEntriesList() - .toArray(new LogEntryProto[r.getEntriesCount()]); - final TermIndex previous = r.hasPreviousLog() ? - ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(r.getServerRequest().getRequestorId(), - r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), - entries); - } - - private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, - TermIndex previous, long leaderCommit, boolean initializing, - LogEntryProto... entries) throws IOException { - CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, entries); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, - ServerProtoUtils.toString(entries)); - } - lifeCycle.assertCurrentState(STARTING, RUNNING); - - try { - validateEntries(leaderTerm, previous, entries); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - - final long currentTerm; - long nextIndex = state.getLog().getNextIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: do not recognize leader. Reply: {}", - getId(), ProtoUtils.toString(reply)); - } - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // We need to check if "previous" is in the local peer. Note that it is - // possible that "previous" is covered by the latest snapshot: e.g., - // it's possible there's no log entries outside of the latest snapshot. - // However, it is not possible that "previous" index is smaller than the - // last index included in snapshot. This is because indices <= snapshot's - // last index should have been committed. - if (previous != null && !containPrevious(previous)) { - final AppendEntriesReplyProto reply = - ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), - currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); - LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", - getId(), previous, ServerProtoUtils.toString(reply)); - return reply; - } - - state.getLog().append(entries); - state.updateConfiguration(entries); - state.updateStatemachine(leaderCommit, currentTerm); - } - if (entries != null && entries.length > 0) { - try { - state.getLog().logSync(); - } catch (InterruptedException e) { - throw new InterruptedIOException("logSync got interrupted"); - } - nextIndex = entries[entries.length - 1].getIndex() + 1; - } - synchronized (this) { - if (lifeCycle.getCurrentState() == RUNNING && isFollower() - && getState().getCurrentTerm() == currentTerm) { - // reset election timer to avoid punishing the leader for our own - // long disk writes - heartbeatMonitor.updateLastRpcTime(false); - } - } - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, SUCCESS); - LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), - ServerProtoUtils.toString(reply)); - return reply; - } - - private boolean containPrevious(TermIndex previous) { - LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", - getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); - return state.getLog().contains(previous) - || (state.getLatestSnapshot() != null - && state.getLatestSnapshot().getTermIndex().equals(previous)) - || (state.getLatestInstalledSnapshot() != null) - && state.getLatestInstalledSnapshot().equals(previous); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - final String leaderId = request.getServerRequest().getRequestorId(); - CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); - LOG.debug("{}: receive installSnapshot({})", getId(), request); - - lifeCycle.assertCurrentState(STARTING, RUNNING); - - final long currentTerm; - final long leaderTerm = request.getLeaderTerm(); - final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( - request.getTermIndex()); - final long lastIncludedIndex = lastTermIndex.getIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final InstallSnapshotReplyProto reply = ServerProtoUtils - .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, - request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); - LOG.debug("{}: do not recognize leader for installing snapshot." + - " Reply: {}", getId(), reply); - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - Preconditions.checkState( - state.getLog().getNextIndex() <= lastIncludedIndex, - "%s log's next id is %s, last included index in snapshot is %s", - getId(), state.getLog().getNextIndex(), lastIncludedIndex); - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - // update the committed index - // re-load the state machine if this is the last chunk - if (request.getDone()) { - state.reloadStateMachine(lastIncludedIndex, leaderTerm); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(false); - } - } - if (request.getDone()) { - LOG.info("{}: successfully install the whole snapshot-{}", getId(), - lastIncludedIndex); - } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), - currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); - } - - AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, - String targetId, TermIndex previous, List<LogEntryProto> entries, - boolean initializing) { - return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, - leaderTerm, entries, state.getLog().getLastCommittedIndex(), - initializing, previous); - } - - synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( - String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, - List<FileChunkProto> chunks, boolean done) { - OptionalLong totalSize = snapshot.getFiles().stream() - .mapToLong(FileInfo::getFileSize).reduce(Long::sum); - assert totalSize.isPresent(); - return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, - requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), - chunks, totalSize.getAsLong(), done); - } - - synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, - long term, TermIndex lastEntry) { - return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, - lastEntry); - } - - public synchronized void submitLocalSyncEvent() { - if (isLeader() && leaderState != null) { - leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); - } - } - - public void addPeersToRPC(Iterable<RaftPeer> peers) { - serverRpc.addPeers(peers); - } - - synchronized void replyPendingRequest(long logIndex, - CompletableFuture<Message> message) { - if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logIndex, message); - } - } - - TransactionContext getTransactionContext(long index) { - if (leaderState != null) { // is leader and is running - return leaderState.getTransactionContext(index); - } - return null; - } - - public RaftProperties getProperties() { - return this.properties; - } -}
