http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java new file mode 100644 index 0000000..6778683 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java @@ -0,0 +1,749 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server.impl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.raft.conf.RaftProperties; +import org.apache.raft.protocol.*; +import org.apache.raft.server.RaftServerConfigKeys; +import org.apache.raft.server.protocol.RaftServerProtocol; +import org.apache.raft.server.protocol.TermIndex; +import org.apache.raft.server.storage.FileInfo; +import org.apache.raft.shaded.proto.RaftProtos.*; +import org.apache.raft.statemachine.SnapshotInfo; +import org.apache.raft.statemachine.StateMachine; +import org.apache.raft.statemachine.TransactionContext; +import org.apache.raft.util.CodeInjectionForTesting; +import org.apache.raft.util.LifeCycle; +import org.apache.raft.util.ProtoUtils; +import org.apache.raft.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; +import static org.apache.raft.util.LifeCycle.State.*; + +public class RaftServerImpl implements RaftServerProtocol, Closeable { + public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); + + private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName(); + static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; + static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; + static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; + + + private final int minTimeoutMs; + private final int maxTimeoutMs; + + private final LifeCycle lifeCycle; + private final ServerState state; + private final StateMachine stateMachine; + private final RaftProperties properties; + private volatile Role role; + + /** used when the peer is follower, to monitor election timeout */ + private volatile FollowerState heartbeatMonitor; + + /** used when the peer is candidate, to request votes from other peers */ + private volatile LeaderElection electionDaemon; + + /** used when the peer is leader */ + private volatile LeaderState leaderState; + + private RaftServerRpc serverRpc; + + private final LogAppenderFactory appenderFactory; + + public RaftServerImpl(String id, RaftConfiguration raftConf, + RaftProperties properties, StateMachine stateMachine) throws IOException { + this.lifeCycle = new LifeCycle(id); + minTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); + maxTimeoutMs = properties.getInt( + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, + RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); + Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, + "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); + this.properties = properties; + this.stateMachine = stateMachine; + this.state = new ServerState(id, raftConf, properties, this, stateMachine); + appenderFactory = initAppenderFactory(); + } + + public int getMinTimeoutMs() { + return minTimeoutMs; + } + + public int getMaxTimeoutMs() { + return maxTimeoutMs; + } + + public int getRandomTimeoutMs() { + return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); + } + + public StateMachine getStateMachine() { + return this.stateMachine; + } + + public LogAppenderFactory getLogAppenderFactory() { + return appenderFactory; + } + + private LogAppenderFactory initAppenderFactory() { + Class<? extends LogAppenderFactory> factoryClass = properties.getClass( + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, + RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, + LogAppenderFactory.class); + return RaftUtils.newInstance(factoryClass); + } + + /** + * Used by tests to set initial raft configuration with correct port bindings. + */ + @VisibleForTesting + public void setInitialConf(RaftConfiguration conf) { + this.state.setInitialConf(conf); + } + + public void setServerRpc(RaftServerRpc serverRpc) { + this.serverRpc = serverRpc; + // add peers into rpc service + RaftConfiguration conf = getRaftConf(); + if (conf != null) { + addPeersToRPC(conf.getPeers()); + } + } + + public RaftServerRpc getServerRpc() { + return serverRpc; + } + + public void start() { + lifeCycle.transition(STARTING); + state.start(); + RaftConfiguration conf = getRaftConf(); + if (conf != null && conf.contains(getId())) { + LOG.debug("{} starts as a follower", getId()); + startAsFollower(); + } else { + LOG.debug("{} starts with initializing state", getId()); + startInitializing(); + } + } + + /** + * The peer belongs to the current configuration, should start as a follower + */ + private void startAsFollower() { + role = Role.FOLLOWER; + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + + serverRpc.start(); + lifeCycle.transition(RUNNING); + } + + /** + * The peer does not have any configuration (maybe it will later be included + * in some configuration). Start still as a follower but will not vote or + * start election. + */ + private void startInitializing() { + role = Role.FOLLOWER; + // do not start heartbeatMonitoring + serverRpc.start(); + } + + public ServerState getState() { + return this.state; + } + + public String getId() { + return getState().getSelfId(); + } + + public RaftConfiguration getRaftConf() { + return getState().getRaftConf(); + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + try { + shutdownHeartbeatMonitor(); + shutdownElectionDaemon(); + shutdownLeaderState(); + + serverRpc.shutdown(); + state.close(); + } catch (Exception ignored) { + LOG.warn("Failed to kill " + state.getSelfId(), ignored); + } + }); + } + + public boolean isAlive() { + return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); + } + + public boolean isFollower() { + return role == Role.FOLLOWER; + } + + public boolean isCandidate() { + return role == Role.CANDIDATE; + } + + public boolean isLeader() { + return role == Role.LEADER; + } + + Role getRole() { + return role; + } + + /** + * Change the server state to Follower if necessary + * @param newTerm The new term. + * @param sync We will call {@link ServerState#persistMetadata()} if this is + * set to true and term/votedFor get updated. + * @return if the term/votedFor should be updated to the new term + * @throws IOException if term/votedFor persistence failed. + */ + synchronized boolean changeToFollower(long newTerm, boolean sync) + throws IOException { + final Role old = role; + role = Role.FOLLOWER; + + boolean metadataUpdated = false; + if (newTerm > state.getCurrentTerm()) { + state.setCurrentTerm(newTerm); + state.resetLeaderAndVotedFor(); + metadataUpdated = true; + } + + if (old == Role.LEADER) { + assert leaderState != null; + shutdownLeaderState(); + } else if (old == Role.CANDIDATE) { + shutdownElectionDaemon(); + } + + if (old != Role.FOLLOWER) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + + if (metadataUpdated && sync) { + state.persistMetadata(); + } + return metadataUpdated; + } + + private synchronized void shutdownLeaderState() { + final LeaderState leader = leaderState; + if (leader != null) { + leader.stop(); + } + leaderState = null; + // TODO: make sure that StateMachineUpdater has applied all transactions that have context + } + + private void shutdownElectionDaemon() { + final LeaderElection election = electionDaemon; + if (election != null) { + election.stopRunning(); + // no need to interrupt the election thread + } + electionDaemon = null; + } + + synchronized void changeToLeader() { + Preconditions.checkState(isCandidate()); + shutdownElectionDaemon(); + role = Role.LEADER; + state.becomeLeader(); + // start sending AppendEntries RPC to followers + leaderState = new LeaderState(this, properties); + leaderState.start(); + } + + private void shutdownHeartbeatMonitor() { + final FollowerState hm = heartbeatMonitor; + if (hm != null) { + hm.stopRunning(); + hm.interrupt(); + } + heartbeatMonitor = null; + } + + synchronized void changeToCandidate() { + Preconditions.checkState(isFollower()); + shutdownHeartbeatMonitor(); + role = Role.CANDIDATE; + // start election + electionDaemon = new LeaderElection(this); + electionDaemon.start(); + } + + @Override + public String toString() { + return role + " " + state + " " + lifeCycle.getCurrentState(); + } + + /** + * @return null if the server is in leader state. + */ + CompletableFuture<RaftClientReply> checkLeaderState( + RaftClientRequest request) { + if (!isLeader()) { + NotLeaderException exception = generateNotLeaderException(); + CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); + future.complete(new RaftClientReply(request, exception)); + return future; + } + return null; + } + + NotLeaderException generateNotLeaderException() { + if (lifeCycle.getCurrentState() != RUNNING) { + return new NotLeaderException(getId(), null, null); + } + String leaderId = state.getLeaderId(); + if (leaderId == null || leaderId.equals(state.getSelfId())) { + // No idea about who is the current leader. Or the peer is the current + // leader, but it is about to step down + RaftPeer suggestedLeader = state.getRaftConf() + .getRandomPeer(state.getSelfId()); + leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); + } + RaftConfiguration conf = getRaftConf(); + Collection<RaftPeer> peers = conf.getPeers(); + return new NotLeaderException(getId(), conf.getPeer(leaderId), + peers.toArray(new RaftPeer[peers.size()])); + } + + /** + * Handle a normal update request from client. + */ + public CompletableFuture<RaftClientReply> appendTransaction( + RaftClientRequest request, TransactionContext entry) + throws RaftException { + LOG.debug("{}: receive client request({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply; + + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + // append the message to its local log + final long entryIndex; + try { + entryIndex = state.applyLog(entry); + } catch (IOException e) { + throw new RaftException(e); + } + + // put the request into the pending queue + pending = leaderState.addPendingRequest(entryIndex, request, entry); + leaderState.notifySenders(); + } + return pending.getFuture(); + } + + /** + * Handle a raft configuration change request from client. + */ + public CompletableFuture<RaftClientReply> setConfiguration( + SetConfigurationRequest request) throws IOException { + LOG.debug("{}: receive setConfiguration({})", getId(), request); + lifeCycle.assertCurrentState(RUNNING); + CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); + final PendingRequest pending; + synchronized (this) { + reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + final RaftConfiguration current = getRaftConf(); + // make sure there is no other raft reconfiguration in progress + if (!current.isStable() || leaderState.inStagingState() || + !state.isCurrentConfCommitted()) { + throw new ReconfigurationInProgressException( + "Reconfiguration is already in progress: " + current); + } + + // return true if the new configuration is the same with the current one + if (current.hasNoChange(peersInNewConf)) { + pending = leaderState.returnNoConfChange(request); + return pending.getFuture(); + } + + // add new peers into the rpc service + addPeersToRPC(Arrays.asList(peersInNewConf)); + // add staging state into the leaderState + pending = leaderState.startSetConfiguration(request); + } + return pending.getFuture(); + } + + private boolean shouldWithholdVotes() { + return isLeader() || (isFollower() && state.hasLeader() + && heartbeatMonitor.shouldWithholdVotes()); + } + + /** + * check if the remote peer is not included in the current conf + * and should shutdown. should shutdown if all the following stands: + * 1. this is a leader + * 2. current conf is stable and has been committed + * 3. candidate id is not included in conf + * 4. candidate's last entry's index < conf's index + */ + private boolean shouldSendShutdown(String candidateId, + TermIndex candidateLastEntry) { + return isLeader() + && getRaftConf().isStable() + && getState().isConfCommitted() + && !getRaftConf().containsInConf(candidateId) + && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() + && !leaderState.isBootStrappingPeer(candidateId); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) + throws IOException { + final String candidateId = r.getServerRequest().getRequestorId(); + return requestVote(candidateId, r.getCandidateTerm(), + ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); + } + + private RequestVoteReplyProto requestVote(String candidateId, + long candidateTerm, TermIndex candidateLastEntry) throws IOException { + CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), + candidateId, candidateTerm, candidateLastEntry); + LOG.debug("{}: receive requestVote({}, {}, {})", + getId(), candidateId, candidateTerm, candidateLastEntry); + lifeCycle.assertCurrentState(RUNNING); + + boolean voteGranted = false; + boolean shouldShutdown = false; + final RequestVoteReplyProto reply; + synchronized (this) { + if (shouldWithholdVotes()) { + LOG.info("{} Withhold vote from server {} with term {}. " + + "This server:{}, last rpc time from leader {} is {}", getId(), + candidateId, candidateTerm, this, this.getState().getLeaderId(), + (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); + } else if (state.recognizeCandidate(candidateId, candidateTerm)) { + boolean termUpdated = changeToFollower(candidateTerm, false); + // see Section 5.4.1 Election restriction + if (state.isLogUpToDate(candidateLastEntry)) { + heartbeatMonitor.updateLastRpcTime(false); + state.grantVote(candidateId); + voteGranted = true; + } + if (termUpdated || voteGranted) { + state.persistMetadata(); // sync metafile + } + } + if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { + shouldShutdown = true; + } + reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), + voteGranted, state.getCurrentTerm(), shouldShutdown); + if (LOG.isDebugEnabled()) { + LOG.debug("{} replies to vote request: {}. Peer's state: {}", + getId(), ProtoUtils.toString(reply), state); + } + } + return reply; + } + + private void validateEntries(long expectedTerm, TermIndex previous, + LogEntryProto... entries) { + if (entries != null && entries.length > 0) { + final long index0 = entries[0].getIndex(); + + if (previous == null || previous.getTerm() == 0) { + Preconditions.checkArgument(index0 == 0, + "Unexpected Index: previous is null but entries[%s].getIndex()=%s", + 0, index0); + } else { + Preconditions.checkArgument(previous.getIndex() == index0 - 1, + "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", + previous, 0, index0); + } + + for (int i = 0; i < entries.length; i++) { + final long t = entries[i].getTerm(); + Preconditions.checkArgument(expectedTerm >= t, + "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", + i, t, expectedTerm); + + final long indexi = entries[i].getIndex(); + Preconditions.checkArgument(indexi == index0 + i, + "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", + i, indexi, index0); + } + } + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) + throws IOException { + // TODO avoid converting list to array + final LogEntryProto[] entries = r.getEntriesList() + .toArray(new LogEntryProto[r.getEntriesCount()]); + final TermIndex previous = r.hasPreviousLog() ? + ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; + return appendEntries(r.getServerRequest().getRequestorId(), + r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), + entries); + } + + private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, + TermIndex previous, long leaderCommit, boolean initializing, + LogEntryProto... entries) throws IOException { + CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, entries); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), + leaderId, leaderTerm, previous, leaderCommit, initializing, + ServerProtoUtils.toString(entries)); + } + lifeCycle.assertCurrentState(STARTING, RUNNING); + + try { + validateEntries(leaderTerm, previous, entries); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } + + final long currentTerm; + long nextIndex = state.getLog().getNextIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: do not recognize leader. Reply: {}", + getId(), ProtoUtils.toString(reply)); + } + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // We need to check if "previous" is in the local peer. Note that it is + // possible that "previous" is covered by the latest snapshot: e.g., + // it's possible there's no log entries outside of the latest snapshot. + // However, it is not possible that "previous" index is smaller than the + // last index included in snapshot. This is because indices <= snapshot's + // last index should have been committed. + if (previous != null && !containPrevious(previous)) { + final AppendEntriesReplyProto reply = + ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), + currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); + LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", + getId(), previous, ServerProtoUtils.toString(reply)); + return reply; + } + + state.getLog().append(entries); + state.updateConfiguration(entries); + state.updateStatemachine(leaderCommit, currentTerm); + } + if (entries != null && entries.length > 0) { + try { + state.getLog().logSync(); + } catch (InterruptedException e) { + throw new InterruptedIOException("logSync got interrupted"); + } + nextIndex = entries[entries.length - 1].getIndex() + 1; + } + synchronized (this) { + if (lifeCycle.getCurrentState() == RUNNING && isFollower() + && getState().getCurrentTerm() == currentTerm) { + // reset election timer to avoid punishing the leader for our own + // long disk writes + heartbeatMonitor.updateLastRpcTime(false); + } + } + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), currentTerm, nextIndex, SUCCESS); + LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), + ServerProtoUtils.toString(reply)); + return reply; + } + + private boolean containPrevious(TermIndex previous) { + LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", + getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); + return state.getLog().contains(previous) + || (state.getLatestSnapshot() != null + && state.getLatestSnapshot().getTermIndex().equals(previous)) + || (state.getLatestInstalledSnapshot() != null) + && state.getLatestInstalledSnapshot().equals(previous); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + final String leaderId = request.getServerRequest().getRequestorId(); + CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); + LOG.debug("{}: receive installSnapshot({})", getId(), request); + + lifeCycle.assertCurrentState(STARTING, RUNNING); + + final long currentTerm; + final long leaderTerm = request.getLeaderTerm(); + final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( + request.getTermIndex()); + final long lastIncludedIndex = lastTermIndex.getIndex(); + synchronized (this) { + final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); + currentTerm = state.getCurrentTerm(); + if (!recognized) { + final InstallSnapshotReplyProto reply = ServerProtoUtils + .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, + request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + LOG.debug("{}: do not recognize leader for installing snapshot." + + " Reply: {}", getId(), reply); + return reply; + } + changeToFollower(leaderTerm, true); + state.setLeader(leaderId); + + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(true); + } + + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + Preconditions.checkState( + state.getLog().getNextIndex() <= lastIncludedIndex, + "%s log's next id is %s, last included index in snapshot is %s", + getId(), state.getLog().getNextIndex(), lastIncludedIndex); + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + // update the committed index + // re-load the state machine if this is the last chunk + if (request.getDone()) { + state.reloadStateMachine(lastIncludedIndex, leaderTerm); + } + if (lifeCycle.getCurrentState() == RUNNING) { + heartbeatMonitor.updateLastRpcTime(false); + } + } + if (request.getDone()) { + LOG.info("{}: successfully install the whole snapshot-{}", getId(), + lastIncludedIndex); + } + return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), + currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); + } + + AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, + String targetId, TermIndex previous, List<LogEntryProto> entries, + boolean initializing) { + return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, + leaderTerm, entries, state.getLog().getLastCommittedIndex(), + initializing, previous); + } + + synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( + String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, + List<FileChunkProto> chunks, boolean done) { + OptionalLong totalSize = snapshot.getFiles().stream() + .mapToLong(FileInfo::getFileSize).reduce(Long::sum); + assert totalSize.isPresent(); + return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, + requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), + chunks, totalSize.getAsLong(), done); + } + + synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, + long term, TermIndex lastEntry) { + return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, + lastEntry); + } + + public synchronized void submitLocalSyncEvent() { + if (isLeader() && leaderState != null) { + leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); + } + } + + public void addPeersToRPC(Iterable<RaftPeer> peers) { + serverRpc.addPeers(peers); + } + + synchronized void replyPendingRequest(long logIndex, + CompletableFuture<Message> message) { + if (isLeader() && leaderState != null) { // is leader and is running + leaderState.replyPendingRequest(logIndex, message); + } + } + + TransactionContext getTransactionContext(long index) { + if (leaderState != null) { // is leader and is running + return leaderState.getTransactionContext(index); + } + return null; + } + + public RaftProperties getProperties() { + return this.properties; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java index cc8651d..b897afd 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java @@ -44,10 +44,10 @@ import java.util.concurrent.ExecutionException; public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol { static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class); - private final RaftServer server; + private final RaftServerImpl server; private final StateMachine stateMachine; - public RequestDispatcher(RaftServer server) { + public RequestDispatcher(RaftServerImpl server) { this.server = server; this.stateMachine = server.getStateMachine(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java index 6680175..c91968c 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java @@ -40,7 +40,7 @@ import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY */ public class ServerState implements Closeable { private final String selfId; - private final RaftServer server; + private final RaftServerImpl server; /** Raft log */ private final RaftLog log; /** Raft configuration */ @@ -74,7 +74,7 @@ public class ServerState implements Closeable { private TermIndex latestInstalledSnapshot; ServerState(String id, RaftConfiguration conf, RaftProperties prop, - RaftServer server, StateMachine stateMachine) throws IOException { + RaftServerImpl server, StateMachine stateMachine) throws IOException { this.selfId = id; this.server = server; configurationManager = new ConfigurationManager(conf); @@ -128,7 +128,7 @@ public class ServerState implements Closeable { * note we do not apply log entries to the state machine here since we do not * know whether they have been committed. */ - private RaftLog initLog(String id, RaftProperties prop, RaftServer server, + private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server, long lastIndexInSnapshot) throws IOException { final RaftLog log; if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, @@ -273,7 +273,7 @@ public class ServerState implements Closeable { public void setRaftConf(long logIndex, RaftConfiguration conf) { configurationManager.addConfiguration(logIndex, conf); - RaftServer.LOG.info("{}: successfully update the configuration {}", + RaftServerImpl.LOG.info("{}: successfully update the configuration {}", getSelfId(), conf); } @@ -306,7 +306,7 @@ public class ServerState implements Closeable { @Override public void close() throws IOException { stateMachineUpdater.stop(); - RaftServer.LOG.info("{} closes. The last applied log index is {}", + RaftServerImpl.LOG.info("{} closes. The last applied log index is {}", getSelfId(), getLastAppliedIndex()); storage.close(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java index 06fa221..f85639b 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java @@ -57,7 +57,7 @@ class StateMachineUpdater implements Runnable { private final RaftProperties properties; private final StateMachine stateMachine; - private final RaftServer server; + private final RaftServerImpl server; private final RaftLog raftLog; private volatile long lastAppliedIndex; @@ -69,7 +69,7 @@ class StateMachineUpdater implements Runnable { private final Thread updater; private volatile State state = State.RUNNING; - StateMachineUpdater(StateMachine stateMachine, RaftServer server, + StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server, RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) { this.properties = properties; this.stateMachine = stateMachine; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java index 1837e94..6cef212 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java @@ -20,7 +20,7 @@ package org.apache.raft.server.storage; import com.google.common.base.Preconditions; import org.apache.raft.conf.RaftProperties; import org.apache.raft.io.nativeio.NativeIO; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.LogSegment.SegmentFileInfo; import org.apache.raft.server.storage.RaftLogCache.TruncationSegments; @@ -56,7 +56,7 @@ class RaftLogWorker implements Runnable { private final RaftStorage storage; private LogOutputStream out; - private final RaftServer raftServer; + private final RaftServerImpl raftServer; /** * The number of entries that have been written into the LogOutputStream but @@ -72,8 +72,8 @@ class RaftLogWorker implements Runnable { private final RaftProperties properties; - RaftLogWorker(RaftServer raftServer, RaftStorage storage, - RaftProperties properties) { + RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage, + RaftProperties properties) { this.raftServer = raftServer; this.storage = storage; this.properties = properties; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java index 9c55491..293e1a4 100644 --- a/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java +++ b/raft-server/src/main/java/org/apache/raft/server/storage/SegmentedRaftLog.java @@ -22,7 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.Charsets; import org.apache.raft.conf.RaftProperties; import org.apache.raft.server.impl.ConfigurationManager; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerConstants; import org.apache.raft.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; @@ -100,8 +100,8 @@ public class SegmentedRaftLog extends RaftLog { private final RaftLogWorker fileLogWorker; private final long segmentMaxSize; - public SegmentedRaftLog(String selfId, RaftServer server, RaftStorage storage, - long lastIndexInSnapshot, RaftProperties properties) throws IOException { + public SegmentedRaftLog(String selfId, RaftServerImpl server, RaftStorage storage, + long lastIndexInSnapshot, RaftProperties properties) throws IOException { super(selfId); this.storage = storage; this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java index 4f0871f..45cec15 100644 --- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java +++ b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java @@ -26,7 +26,7 @@ import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.impl.DelayLocalExecutionInjection; import org.apache.raft.server.impl.RaftConfiguration; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerRpc; import org.apache.raft.server.storage.MemoryRaftLog; import org.apache.raft.server.storage.RaftLog; @@ -72,7 +72,7 @@ public abstract class MiniRaftCluster { super(ids, properties, formatted); } - protected abstract RaftServer setPeerRpc(RaftPeer peer) throws IOException; + protected abstract RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException; @Override protected void setPeerRpc() throws IOException { @@ -133,7 +133,7 @@ public abstract class MiniRaftCluster { protected RaftConfiguration conf; protected final RaftProperties properties; private final String testBaseDir; - protected final Map<String, RaftServer> servers = + protected final Map<String, RaftServerImpl> servers = Collections.synchronizedMap(new LinkedHashMap<>()); public MiniRaftCluster(String[] ids, RaftProperties properties, @@ -152,7 +152,7 @@ public abstract class MiniRaftCluster { LOG.info("peers = " + peers.keySet()); conf = RaftConfiguration.newBuilder().setConf(peers.keySet()).build(); for (Map.Entry<RaftPeer, RPC> entry : peers.entrySet()) { - final RaftServer server = servers.get(entry.getKey().getId()); + final RaftServerImpl server = servers.get(entry.getKey().getId()); server.setInitialConf(conf); server.setServerRpc(entry.getValue()); } @@ -160,7 +160,7 @@ public abstract class MiniRaftCluster { public void start() { LOG.info("Starting " + getClass().getSimpleName()); - servers.values().forEach(RaftServer::start); + servers.values().forEach(RaftServerImpl::start); } /** @@ -173,8 +173,8 @@ public abstract class MiniRaftCluster { } public final void restart(boolean format) throws IOException { - servers.values().stream().filter(RaftServer::isAlive) - .forEach(RaftServer::close); + servers.values().stream().filter(RaftServerImpl::isAlive) + .forEach(RaftServerImpl::close); List<String> idList = new ArrayList<>(servers.keySet()); for (String id : idList) { servers.remove(id); @@ -197,16 +197,16 @@ public abstract class MiniRaftCluster { return conf; } - private RaftServer newRaftServer(String id, RaftConfiguration conf, - boolean format) { - final RaftServer s; + private RaftServerImpl newRaftServer(String id, RaftConfiguration conf, + boolean format) { + final RaftServerImpl s; try { final String dirStr = testBaseDir + id; if (format) { formatDir(dirStr); } properties.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr); - s = new RaftServer(id, conf, properties, getStateMachine4Test(properties)); + s = new RaftServerImpl(id, conf, properties, getStateMachine4Test(properties)); } catch (IOException e) { throw new RuntimeException(e); } @@ -224,20 +224,20 @@ public abstract class MiniRaftCluster { public abstract RaftClientRequestSender getRaftClientRequestSender(); protected <RPC extends RaftServerRpc> Collection<RaftPeer> addNewPeers( - Map<RaftPeer, RPC> newPeers, Collection<RaftServer> newServers, + Map<RaftPeer, RPC> newPeers, Collection<RaftServerImpl> newServers, boolean startService) throws IOException { for (Map.Entry<RaftPeer, RPC> entry : newPeers.entrySet()) { - RaftServer server = servers.get(entry.getKey().getId()); + RaftServerImpl server = servers.get(entry.getKey().getId()); server.setServerRpc(entry.getValue()); } if (startService) { - newServers.forEach(RaftServer::start); + newServers.forEach(RaftServerImpl::start); } return new ArrayList<>(newPeers.keySet()); } protected abstract Collection<RaftPeer> addNewPeers( - Collection<RaftPeer> newPeers, Collection<RaftServer> newServers, + Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, boolean startService) throws IOException; public PeerChanges addNewPeers(int number, boolean startNewPeer) @@ -254,9 +254,9 @@ public abstract class MiniRaftCluster { } // create and add new RaftServers - final List<RaftServer> newServers = new ArrayList<>(ids.length); + final List<RaftServerImpl> newServers = new ArrayList<>(ids.length); for (RaftPeer p : newPeers) { - RaftServer newServer = newRaftServer(p.getId(), conf, true); + RaftServerImpl newServer = newRaftServer(p.getId(), conf, true); Preconditions.checkArgument(!servers.containsKey(p.getId())); servers.put(p.getId(), newServer); newServers.add(newServer); @@ -273,12 +273,12 @@ public abstract class MiniRaftCluster { } public void startServer(String id) { - RaftServer server = servers.get(id); + RaftServerImpl server = servers.get(id); assert server != null; server.start(); } - private RaftPeer getPeer(RaftServer s) { + private RaftPeer getPeer(RaftServerImpl s) { return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()); } @@ -295,7 +295,7 @@ public abstract class MiniRaftCluster { peers.remove(leader); removedPeers.add(leader); } - List<RaftServer> followers = getFollowers(); + List<RaftServerImpl> followers = getFollowers(); for (int i = 0, removed = 0; i < followers.size() && removed < (removeLeader ? number - 1 : number); i++) { RaftPeer toRemove = getPeer(followers.get(i)); @@ -317,7 +317,7 @@ public abstract class MiniRaftCluster { public String printServers() { StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServer s : servers.values()) { + for (RaftServerImpl s : servers.values()) { b.append(" "); b.append(s).append("\n"); } @@ -326,7 +326,7 @@ public abstract class MiniRaftCluster { public String printAllLogs() { StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n"); - for (RaftServer s : servers.values()) { + for (RaftServerImpl s : servers.values()) { b.append(" "); b.append(s).append("\n"); @@ -339,8 +339,8 @@ public abstract class MiniRaftCluster { return b.toString(); } - public RaftServer getLeader() { - final List<RaftServer> leaders = new ArrayList<>(); + public RaftServerImpl getLeader() { + final List<RaftServerImpl> leaders = new ArrayList<>(); servers.values().stream() .filter(s -> s.isAlive() && s.isLeader()) .forEach(s -> { @@ -367,21 +367,21 @@ public abstract class MiniRaftCluster { } public boolean isLeader(String leaderId) throws InterruptedException { - final RaftServer leader = getLeader(); + final RaftServerImpl leader = getLeader(); return leader != null && leader.getId().equals(leaderId); } - public List<RaftServer> getFollowers() { + public List<RaftServerImpl> getFollowers() { return servers.values().stream() .filter(s -> s.isAlive() && s.isFollower()) .collect(Collectors.toList()); } - public Collection<RaftServer> getServers() { + public Collection<RaftServerImpl> getServers() { return servers.values(); } - public RaftServer getServer(String id) { + public RaftServerImpl getServer(String id) { return servers.get(id); } @@ -398,8 +398,8 @@ public abstract class MiniRaftCluster { public void shutdown() { LOG.info("Stopping " + getClass().getSimpleName()); - servers.values().stream().filter(RaftServer::isAlive) - .forEach(RaftServer::close); + servers.values().stream().filter(RaftServerImpl::isAlive) + .forEach(RaftServerImpl::close); if (ExitUtils.isTerminated()) { LOG.error("Test resulted in an unexpected exit", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java index 921e063..ed40bde 100644 --- a/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java +++ b/raft-server/src/test/java/org/apache/raft/RaftBasicTests.java @@ -20,7 +20,7 @@ package org.apache.raft; import org.apache.raft.RaftTestUtil.SimpleMessage; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.junit.*; import org.junit.rules.Timeout; import org.slf4j.Logger; @@ -80,7 +80,7 @@ public abstract class RaftBasicTests { public void testBasicAppendEntries() throws Exception { LOG.info("Running testBasicAppendEntries"); final MiniRaftCluster cluster = getCluster(); - RaftServer leader = waitForLeader(cluster); + RaftServerImpl leader = waitForLeader(cluster); final long term = leader.getState().getCurrentTerm(); final String killed = cluster.getFollowers().get(3).getId(); cluster.killServer(killed); @@ -96,7 +96,7 @@ public abstract class RaftBasicTests { Thread.sleep(cluster.getMaxTimeout() + 100); LOG.info(cluster.printAllLogs()); - cluster.getServers().stream().filter(RaftServer::isAlive) + cluster.getServers().stream().filter(RaftServerImpl::isAlive) .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE)) .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages)); } @@ -174,7 +174,7 @@ public abstract class RaftBasicTests { lastStep = n; count++; - RaftServer leader = cluster.getLeader(); + RaftServerImpl leader = cluster.getLeader(); if (leader != null) { final String oldLeader = leader.getId(); LOG.info("Block all requests sent by leader " + oldLeader); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java index 8a249e9..195cbec 100644 --- a/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java +++ b/raft-server/src/test/java/org/apache/raft/RaftNotLeaderExceptionBaseTest.java @@ -25,7 +25,7 @@ import org.apache.raft.client.impl.RaftClientImpl; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftClientRequest; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.util.RaftUtils; @@ -42,7 +42,7 @@ import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; public abstract class RaftNotLeaderExceptionBaseTest { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java index 92bf5c4..461dd15 100644 --- a/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java +++ b/raft-server/src/test/java/org/apache/raft/RaftTestUtil.java @@ -23,7 +23,7 @@ import org.apache.raft.protocol.Message; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.impl.BlockRequestHandlingInjection; import org.apache.raft.server.impl.DelayLocalExecutionInjection; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.shaded.com.google.protobuf.ByteString; import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; @@ -46,11 +46,11 @@ import static org.apache.raft.util.ProtoUtils.toByteString; public class RaftTestUtil { static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class); - public static RaftServer waitForLeader(MiniRaftCluster cluster) + public static RaftServerImpl waitForLeader(MiniRaftCluster cluster) throws InterruptedException { final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1; LOG.info(cluster.printServers()); - RaftServer leader = null; + RaftServerImpl leader = null; for(int i = 0; leader == null && i < 10; i++) { Thread.sleep(sleepTime); leader = cluster.getLeader(); @@ -59,11 +59,11 @@ public class RaftTestUtil { return leader; } - public static RaftServer waitForLeader(MiniRaftCluster cluster, - final String leaderId) throws InterruptedException { + public static RaftServerImpl waitForLeader(MiniRaftCluster cluster, + final String leaderId) throws InterruptedException { LOG.info(cluster.printServers()); for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { - RaftServer currLeader = cluster.getLeader(); + RaftServerImpl currLeader = cluster.getLeader(); if (LOG.isDebugEnabled()) { LOG.debug("try enforcing leader to " + leaderId + " but " + (currLeader == null? "no leader for this round" @@ -72,14 +72,14 @@ public class RaftTestUtil { } LOG.info(cluster.printServers()); - final RaftServer leader = cluster.getLeader(); + final RaftServerImpl leader = cluster.getLeader(); Assert.assertEquals(leaderId, leader.getId()); return leader; } public static String waitAndKillLeader(MiniRaftCluster cluster, boolean expectLeader) throws InterruptedException { - final RaftServer leader = waitForLeader(cluster); + final RaftServerImpl leader = waitForLeader(cluster); if (!expectLeader) { Assert.assertNull(leader); } else { @@ -105,11 +105,11 @@ public class RaftTestUtil { return idxExpected == expectedMessages.length; } - public static void assertLogEntries(Collection<RaftServer> servers, + public static void assertLogEntries(Collection<RaftServerImpl> servers, SimpleMessage... expectedMessages) { final int size = servers.size(); final long count = servers.stream() - .filter(RaftServer::isAlive) + .filter(RaftServerImpl::isAlive) .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE)) .filter(e -> logEntriesContains(e, expectedMessages)) .count(); @@ -269,7 +269,7 @@ public class RaftTestUtil { return newLeader; } - public static void blockQueueAndSetDelay(Collection<RaftServer> servers, + public static void blockQueueAndSetDelay(Collection<RaftServerImpl> servers, DelayLocalExecutionInjection injection, String leaderId, int delayMs, long maxTimeout) throws InterruptedException { // block reqeusts sent to leader if delayMs > 0 http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java index 0980e93..7f7de9a 100644 --- a/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java +++ b/raft-server/src/test/java/org/apache/raft/server/impl/BlockRequestHandlingInjection.java @@ -29,9 +29,9 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co new BlockRequestHandlingInjection(); static { - CodeInjectionForTesting.put(RaftServer.REQUEST_VOTE, INSTANCE); - CodeInjectionForTesting.put(RaftServer.APPEND_ENTRIES, INSTANCE); - CodeInjectionForTesting.put(RaftServer.INSTALL_SNAPSHOT, INSTANCE); + CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE); + CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE); + CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE); } public static BlockRequestHandlingInjection getInstance() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java index 30f1e15..8a5af69 100644 --- a/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java +++ b/raft-server/src/test/java/org/apache/raft/server/impl/RaftReconfigurationBaseTest.java @@ -54,7 +54,7 @@ import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBody public abstract class RaftReconfigurationBaseTest { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } @@ -221,7 +221,7 @@ public abstract class RaftReconfigurationBaseTest { // check configuration manager's internal state // each reconf will generate two configurations: (old, new) and (new) - cluster.getServers().stream().filter(RaftServer::isAlive) + cluster.getServers().stream().filter(RaftServerImpl::isAlive) .forEach(server -> { ConfigurationManager confManager = (ConfigurationManager) Whitebox.getInternalState(server.getState(), http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java index b30ddc9..5103fca 100644 --- a/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java +++ b/raft-server/src/test/java/org/apache/raft/server/impl/RaftServerTestUtil.java @@ -46,7 +46,7 @@ public class RaftServerTestUtil { int deadIncluded = 0; final RaftConfiguration current = RaftConfiguration.newBuilder() .setConf(peers).setLogEntryIndex(0).build(); - for (RaftServer server : cluster.getServers()) { + for (RaftServerImpl server : cluster.getServers()) { if (deadPeers != null && deadPeers.contains(server.getId())) { if (current.containsInConf(server.getId())) { deadIncluded++; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java index 360fe1e..7414872 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -21,7 +21,7 @@ import org.apache.raft.MiniRaftCluster; import org.apache.raft.client.RaftClientRequestSender; import org.apache.raft.conf.RaftProperties; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +70,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { setRpcServers(getServers()); } - private void setRpcServers(Collection<RaftServer> newServers) { + private void setRpcServers(Collection<RaftServerImpl> newServers) { newServers.forEach(s -> s.setServerRpc( new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply))); } @@ -88,7 +88,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @Override public void restartServer(String id, boolean format) throws IOException { super.restartServer(id, format); - RaftServer s = getServer(id); + RaftServerImpl s = getServer(id); addPeersToRpc(Collections.singletonList(conf.getPeer(id))); s.setServerRpc(new SimulatedServerRpc(s, serverRequestReply, client2serverRequestReply)); @@ -97,11 +97,11 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { @Override public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServer> newServers, boolean startService) { + Collection<RaftServerImpl> newServers, boolean startService) { addPeersToRpc(newPeers); setRpcServers(newServers); if (startService) { - newServers.forEach(RaftServer::start); + newServers.forEach(RaftServerImpl::start); } return newPeers; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java index ed522d4..93e3f5c 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java @@ -22,7 +22,7 @@ import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftClientRequest; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.proto.RaftProtos.*; @@ -39,14 +39,14 @@ import java.util.concurrent.TimeUnit; public class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); - private final RaftServer server; + private final RaftServerImpl server; private final RequestDispatcher dispatcher; private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; private final ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(true).build()); - public SimulatedServerRpc(RaftServer server, + public SimulatedServerRpc(RaftServerImpl server, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { this.server = server; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java index 669226a..faa9dd8 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/TestRaftWithSimulatedRpc.java @@ -21,7 +21,7 @@ import org.apache.log4j.Level; import org.apache.raft.RaftBasicTests; import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.util.RaftUtils; import java.io.IOException; @@ -29,7 +29,7 @@ import java.util.concurrent.ThreadLocalRandom; public class TestRaftWithSimulatedRpc extends RaftBasicTests { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java index 41ae9af..721d12c 100644 --- a/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java +++ b/raft-server/src/test/java/org/apache/raft/statemachine/RaftSnapshotBaseTest.java @@ -25,7 +25,7 @@ import org.apache.raft.client.RaftClient; import org.apache.raft.conf.RaftProperties; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.impl.RaftServerTestUtil; import org.apache.raft.server.simulation.RequestHandler; import org.apache.raft.server.storage.RaftLog; @@ -52,7 +52,7 @@ import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; public abstract class RaftSnapshotBaseTest { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/22f3ee0b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java index c9dd99c..5892c65 100644 --- a/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java +++ b/raft-server/src/test/java/org/apache/raft/statemachine/TestStateMachine.java @@ -25,7 +25,7 @@ import org.apache.raft.conf.RaftProperties; import org.apache.raft.protocol.Message; import org.apache.raft.protocol.RaftClientRequest; import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.impl.RaftServer; +import org.apache.raft.server.impl.RaftServerImpl; import org.apache.raft.server.simulation.MiniRaftClusterWithSimulatedRpc; import org.apache.raft.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.raft.util.RaftUtils; @@ -50,7 +50,7 @@ import static org.junit.Assert.*; */ public class TestStateMachine { static { - RaftUtils.setLogLevel(RaftServer.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } @@ -161,14 +161,14 @@ public class TestStateMachine { // TODO: there eshould be a better way to ensure all data is replicated and applied Thread.sleep(cluster.getMaxTimeout() + 100); - for (RaftServer raftServer : cluster.getServers()) { + for (RaftServerImpl raftServer : cluster.getServers()) { SMTransactionContext sm = ((SMTransactionContext)raftServer.getStateMachine()); sm.rethrowIfException(); assertEquals(numTrx, sm.numApplied.get()); } // check leader - RaftServer raftServer = cluster.getLeader(); + RaftServerImpl raftServer = cluster.getLeader(); // assert every transaction has obtained context in leader SMTransactionContext sm = ((SMTransactionContext)raftServer.getStateMachine()); List<Long> ll = sm.applied.stream().collect(Collectors.toList());
