http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java deleted file mode 100644 index 4b8c442..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java +++ /dev/null @@ -1,813 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.*; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.FileInfo; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.CodeInjectionForTesting; -import org.apache.raft.util.LifeCycle; -import org.apache.raft.util.ProtoUtils; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.OptionalLong; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; -import static org.apache.raft.util.LifeCycle.State.*; - -public class RaftServerImpl implements RaftServer { - public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); - - private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName(); - static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; - static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; - static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; - - - /** Role of raft peer */ - enum Role { - LEADER, CANDIDATE, FOLLOWER - } - - private final int minTimeoutMs; - private final int maxTimeoutMs; - - private final LifeCycle lifeCycle; - private final ServerState state; - private final StateMachine stateMachine; - private final RaftProperties properties; - private volatile Role role; - - /** used when the peer is follower, to monitor election timeout */ - private volatile FollowerState heartbeatMonitor; - - /** used when the peer is candidate, to request votes from other peers */ - private volatile LeaderElection electionDaemon; - - /** used when the peer is leader */ - private volatile LeaderState leaderState; - - private RaftServerRpc serverRpc; - - private final LogAppenderFactory appenderFactory; - - public RaftServerImpl(String id, RaftConfiguration raftConf, - RaftProperties properties, StateMachine stateMachine) throws IOException { - this.lifeCycle = new LifeCycle(id); - minTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT); - maxTimeoutMs = properties.getInt( - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY, - RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT); - Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs, - "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); - this.properties = properties; - this.stateMachine = stateMachine; - this.state = new ServerState(id, raftConf, properties, this, stateMachine); - appenderFactory = initAppenderFactory(); - } - - int getMinTimeoutMs() { - return minTimeoutMs; - } - - int getMaxTimeoutMs() { - return maxTimeoutMs; - } - - int getRandomTimeoutMs() { - return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs); - } - - @Override - public StateMachine getStateMachine() { - return this.stateMachine; - } - - public LogAppenderFactory getLogAppenderFactory() { - return appenderFactory; - } - - private LogAppenderFactory initAppenderFactory() { - Class<? extends LogAppenderFactory> factoryClass = properties.getClass( - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT, - LogAppenderFactory.class); - return RaftUtils.newInstance(factoryClass); - } - - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration conf) { - this.state.setInitialConf(conf); - } - - @Override - public void setServerRpc(RaftServerRpc serverRpc) { - this.serverRpc = serverRpc; - // add peers into rpc service - RaftConfiguration conf = getRaftConf(); - if (conf != null) { - serverRpc.addPeers(conf.getPeers()); - } - } - - public RaftServerRpc getServerRpc() { - return serverRpc; - } - - @Override - public void start() { - lifeCycle.transition(STARTING); - state.start(); - RaftConfiguration conf = getRaftConf(); - if (conf != null && conf.contains(getId())) { - LOG.debug("{} starts as a follower", getId()); - startAsFollower(); - } else { - LOG.debug("{} starts with initializing state", getId()); - startInitializing(); - } - } - - /** - * The peer belongs to the current configuration, should start as a follower - */ - private void startAsFollower() { - role = Role.FOLLOWER; - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - - serverRpc.start(); - lifeCycle.transition(RUNNING); - } - - /** - * The peer does not have any configuration (maybe it will later be included - * in some configuration). Start still as a follower but will not vote or - * start election. - */ - private void startInitializing() { - role = Role.FOLLOWER; - // do not start heartbeatMonitoring - serverRpc.start(); - } - - public ServerState getState() { - return this.state; - } - - @Override - public String getId() { - return getState().getSelfId(); - } - - RaftConfiguration getRaftConf() { - return getState().getRaftConf(); - } - - @Override - public void close() { - lifeCycle.checkStateAndClose(() -> { - try { - shutdownHeartbeatMonitor(); - shutdownElectionDaemon(); - shutdownLeaderState(); - - serverRpc.close(); - state.close(); - } catch (Exception ignored) { - LOG.warn("Failed to kill " + state.getSelfId(), ignored); - } - }); - } - - @VisibleForTesting - public boolean isAlive() { - return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); - } - - public boolean isFollower() { - return role == Role.FOLLOWER; - } - - public boolean isCandidate() { - return role == Role.CANDIDATE; - } - - public boolean isLeader() { - return role == Role.LEADER; - } - - /** - * Change the server state to Follower if necessary - * @param newTerm The new term. - * @param sync We will call {@link ServerState#persistMetadata()} if this is - * set to true and term/votedFor get updated. - * @return if the term/votedFor should be updated to the new term - * @throws IOException if term/votedFor persistence failed. - */ - synchronized boolean changeToFollower(long newTerm, boolean sync) - throws IOException { - final Role old = role; - role = Role.FOLLOWER; - - boolean metadataUpdated = false; - if (newTerm > state.getCurrentTerm()) { - state.setCurrentTerm(newTerm); - state.resetLeaderAndVotedFor(); - metadataUpdated = true; - } - - if (old == Role.LEADER) { - assert leaderState != null; - shutdownLeaderState(); - } else if (old == Role.CANDIDATE) { - shutdownElectionDaemon(); - } - - if (old != Role.FOLLOWER) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - - if (metadataUpdated && sync) { - state.persistMetadata(); - } - return metadataUpdated; - } - - private synchronized void shutdownLeaderState() { - final LeaderState leader = leaderState; - if (leader != null) { - leader.stop(); - } - leaderState = null; - // TODO: make sure that StateMachineUpdater has applied all transactions that have context - } - - private void shutdownElectionDaemon() { - final LeaderElection election = electionDaemon; - if (election != null) { - election.stopRunning(); - // no need to interrupt the election thread - } - electionDaemon = null; - } - - synchronized void changeToLeader() { - Preconditions.checkState(isCandidate()); - shutdownElectionDaemon(); - role = Role.LEADER; - state.becomeLeader(); - // start sending AppendEntries RPC to followers - leaderState = new LeaderState(this, properties); - leaderState.start(); - } - - private void shutdownHeartbeatMonitor() { - final FollowerState hm = heartbeatMonitor; - if (hm != null) { - hm.stopRunning(); - hm.interrupt(); - } - heartbeatMonitor = null; - } - - synchronized void changeToCandidate() { - Preconditions.checkState(isFollower()); - shutdownHeartbeatMonitor(); - role = Role.CANDIDATE; - // start election - electionDaemon = new LeaderElection(this); - electionDaemon.start(); - } - - @Override - public String toString() { - return role + " " + state + " " + lifeCycle.getCurrentState(); - } - - /** - * @return null if the server is in leader state. - */ - private CompletableFuture<RaftClientReply> checkLeaderState( - RaftClientRequest request) { - if (!isLeader()) { - NotLeaderException exception = generateNotLeaderException(); - CompletableFuture<RaftClientReply> future = new CompletableFuture<>(); - future.complete(new RaftClientReply(request, exception)); - return future; - } - return null; - } - - NotLeaderException generateNotLeaderException() { - if (lifeCycle.getCurrentState() != RUNNING) { - return new NotLeaderException(getId(), null, null); - } - String leaderId = state.getLeaderId(); - if (leaderId == null || leaderId.equals(state.getSelfId())) { - // No idea about who is the current leader. Or the peer is the current - // leader, but it is about to step down - RaftPeer suggestedLeader = state.getRaftConf() - .getRandomPeer(state.getSelfId()); - leaderId = suggestedLeader == null ? null : suggestedLeader.getId(); - } - RaftConfiguration conf = getRaftConf(); - Collection<RaftPeer> peers = conf.getPeers(); - return new NotLeaderException(getId(), conf.getPeer(leaderId), - peers.toArray(new RaftPeer[peers.size()])); - } - - /** - * Handle a normal update request from client. - */ - private CompletableFuture<RaftClientReply> appendTransaction( - RaftClientRequest request, TransactionContext entry) - throws RaftException { - LOG.debug("{}: receive client request({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply; - - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - // append the message to its local log - final long entryIndex; - try { - entryIndex = state.applyLog(entry); - } catch (IOException e) { - throw new RaftException(e); - } - - // put the request into the pending queue - pending = leaderState.addPendingRequest(entryIndex, request, entry); - leaderState.notifySenders(); - } - return pending.getFuture(); - } - - @Override - public CompletableFuture<RaftClientReply> submitClientRequestAsync( - RaftClientRequest request) throws IOException { - // first check the server's leader state - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - // let the state machine handle read-only request from client - if (request.isReadOnly()) { - // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, - // section 8 (last part) - return stateMachine.query(request); - } - - // TODO: this client request will not be added to pending requests - // until later which means that any failure in between will leave partial state in the - // state machine. We should call cancelTransaction() for failed requests - TransactionContext entry = stateMachine.startTransaction(request); - if (entry.getException().isPresent()) { - throw RaftUtils.asIOException(entry.getException().get()); - } - - return appendTransaction(request, entry); - } - - @Override - public RaftClientReply submitClientRequest(RaftClientRequest request) - throws IOException { - return waitForReply(getId(), request, submitClientRequestAsync(request)); - } - - private static RaftClientReply waitForReply(String id, RaftClientRequest request, - CompletableFuture<RaftClientReply> future) throws IOException { - try { - return future.get(); - } catch (InterruptedException e) { - final String s = id + ": Interrupted when waiting for reply, request=" + request; - LOG.info(s, e); - throw RaftUtils.toInterruptedIOException(s, e); - } catch (ExecutionException e) { - final Throwable cause = e.getCause(); - if (cause == null) { - throw new IOException(e); - } - if (cause instanceof NotLeaderException) { - return new RaftClientReply(request, (NotLeaderException)cause); - } else { - throw RaftUtils.asIOException(cause); - } - } - } - - @Override - public RaftClientReply setConfiguration(SetConfigurationRequest request) - throws IOException { - return waitForReply(getId(), request, setConfigurationAsync(request)); - } - - /** - * Handle a raft configuration change request from client. - */ - @Override - public CompletableFuture<RaftClientReply> setConfigurationAsync( - SetConfigurationRequest request) throws IOException { - LOG.debug("{}: receive setConfiguration({})", getId(), request); - lifeCycle.assertCurrentState(RUNNING); - CompletableFuture<RaftClientReply> reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftPeer[] peersInNewConf = request.getPeersInNewConf(); - final PendingRequest pending; - synchronized (this) { - reply = checkLeaderState(request); - if (reply != null) { - return reply; - } - - final RaftConfiguration current = getRaftConf(); - // make sure there is no other raft reconfiguration in progress - if (!current.isStable() || leaderState.inStagingState() || - !state.isCurrentConfCommitted()) { - throw new ReconfigurationInProgressException( - "Reconfiguration is already in progress: " + current); - } - - // return true if the new configuration is the same with the current one - if (current.hasNoChange(peersInNewConf)) { - pending = leaderState.returnNoConfChange(request); - return pending.getFuture(); - } - - // add new peers into the rpc service - getServerRpc().addPeers(Arrays.asList(peersInNewConf)); - // add staging state into the leaderState - pending = leaderState.startSetConfiguration(request); - } - return pending.getFuture(); - } - - private boolean shouldWithholdVotes() { - return isLeader() || (isFollower() && state.hasLeader() - && heartbeatMonitor.shouldWithholdVotes()); - } - - /** - * check if the remote peer is not included in the current conf - * and should shutdown. should shutdown if all the following stands: - * 1. this is a leader - * 2. current conf is stable and has been committed - * 3. candidate id is not included in conf - * 4. candidate's last entry's index < conf's index - */ - private boolean shouldSendShutdown(String candidateId, - TermIndex candidateLastEntry) { - return isLeader() - && getRaftConf().isStable() - && getState().isConfCommitted() - && !getRaftConf().containsInConf(candidateId) - && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() - && !leaderState.isBootStrappingPeer(candidateId); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) - throws IOException { - final String candidateId = r.getServerRequest().getRequestorId(); - return requestVote(candidateId, r.getCandidateTerm(), - ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); - } - - private RequestVoteReplyProto requestVote(String candidateId, - long candidateTerm, TermIndex candidateLastEntry) throws IOException { - CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), - candidateId, candidateTerm, candidateLastEntry); - LOG.debug("{}: receive requestVote({}, {}, {})", - getId(), candidateId, candidateTerm, candidateLastEntry); - lifeCycle.assertCurrentState(RUNNING); - - boolean voteGranted = false; - boolean shouldShutdown = false; - final RequestVoteReplyProto reply; - synchronized (this) { - if (shouldWithholdVotes()) { - LOG.info("{} Withhold vote from server {} with term {}. " + - "This server:{}, last rpc time from leader {} is {}", getId(), - candidateId, candidateTerm, this, this.getState().getLeaderId(), - (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); - } else if (state.recognizeCandidate(candidateId, candidateTerm)) { - boolean termUpdated = changeToFollower(candidateTerm, false); - // see Section 5.4.1 Election restriction - if (state.isLogUpToDate(candidateLastEntry)) { - heartbeatMonitor.updateLastRpcTime(false); - state.grantVote(candidateId); - voteGranted = true; - } - if (termUpdated || voteGranted) { - state.persistMetadata(); // sync metafile - } - } - if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) { - shouldShutdown = true; - } - reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(), - voteGranted, state.getCurrentTerm(), shouldShutdown); - if (LOG.isDebugEnabled()) { - LOG.debug("{} replies to vote request: {}. Peer's state: {}", - getId(), ProtoUtils.toString(reply), state); - } - } - return reply; - } - - private void validateEntries(long expectedTerm, TermIndex previous, - LogEntryProto... entries) { - if (entries != null && entries.length > 0) { - final long index0 = entries[0].getIndex(); - - if (previous == null || previous.getTerm() == 0) { - Preconditions.checkArgument(index0 == 0, - "Unexpected Index: previous is null but entries[%s].getIndex()=%s", - 0, index0); - } else { - Preconditions.checkArgument(previous.getIndex() == index0 - 1, - "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", - previous, 0, index0); - } - - for (int i = 0; i < entries.length; i++) { - final long t = entries[i].getTerm(); - Preconditions.checkArgument(expectedTerm >= t, - "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", - i, t, expectedTerm); - - final long indexi = entries[i].getIndex(); - Preconditions.checkArgument(indexi == index0 + i, - "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", - i, indexi, index0); - } - } - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) - throws IOException { - // TODO avoid converting list to array - final LogEntryProto[] entries = r.getEntriesList() - .toArray(new LogEntryProto[r.getEntriesCount()]); - final TermIndex previous = r.hasPreviousLog() ? - ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(r.getServerRequest().getRequestorId(), - r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), - entries); - } - - private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm, - TermIndex previous, long leaderCommit, boolean initializing, - LogEntryProto... entries) throws IOException { - CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, entries); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(), - leaderId, leaderTerm, previous, leaderCommit, initializing, - ServerProtoUtils.toString(entries)); - } - lifeCycle.assertCurrentState(STARTING, RUNNING); - - try { - validateEntries(leaderTerm, previous, entries); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } - - final long currentTerm; - long nextIndex = state.getLog().getNextIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, NOT_LEADER); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: do not recognize leader. Reply: {}", - getId(), ProtoUtils.toString(reply)); - } - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // We need to check if "previous" is in the local peer. Note that it is - // possible that "previous" is covered by the latest snapshot: e.g., - // it's possible there's no log entries outside of the latest snapshot. - // However, it is not possible that "previous" index is smaller than the - // last index included in snapshot. This is because indices <= snapshot's - // last index should have been committed. - if (previous != null && !containPrevious(previous)) { - final AppendEntriesReplyProto reply = - ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), - currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY); - LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", - getId(), previous, ServerProtoUtils.toString(reply)); - return reply; - } - - state.getLog().append(entries); - state.updateConfiguration(entries); - state.updateStatemachine(leaderCommit, currentTerm); - } - if (entries != null && entries.length > 0) { - try { - state.getLog().logSync(); - } catch (InterruptedException e) { - throw new InterruptedIOException("logSync got interrupted"); - } - nextIndex = entries[entries.length - 1].getIndex() + 1; - } - synchronized (this) { - if (lifeCycle.getCurrentState() == RUNNING && isFollower() - && getState().getCurrentTerm() == currentTerm) { - // reset election timer to avoid punishing the leader for our own - // long disk writes - heartbeatMonitor.updateLastRpcTime(false); - } - } - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), currentTerm, nextIndex, SUCCESS); - LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(), - ServerProtoUtils.toString(reply)); - return reply; - } - - private boolean containPrevious(TermIndex previous) { - LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}", - getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); - return state.getLog().contains(previous) - || (state.getLatestSnapshot() != null - && state.getLatestSnapshot().getTermIndex().equals(previous)) - || (state.getLatestInstalledSnapshot() != null) - && state.getLatestInstalledSnapshot().equals(previous); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - final String leaderId = request.getServerRequest().getRequestorId(); - CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); - LOG.debug("{}: receive installSnapshot({})", getId(), request); - - lifeCycle.assertCurrentState(STARTING, RUNNING); - - final long currentTerm; - final long leaderTerm = request.getLeaderTerm(); - final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( - request.getTermIndex()); - final long lastIncludedIndex = lastTermIndex.getIndex(); - synchronized (this) { - final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); - currentTerm = state.getCurrentTerm(); - if (!recognized) { - final InstallSnapshotReplyProto reply = ServerProtoUtils - .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm, - request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); - LOG.debug("{}: do not recognize leader for installing snapshot." + - " Reply: {}", getId(), reply); - return reply; - } - changeToFollower(leaderTerm, true); - state.setLeader(leaderId); - - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } - - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - Preconditions.checkState( - state.getLog().getNextIndex() <= lastIncludedIndex, - "%s log's next id is %s, last included index in snapshot is %s", - getId(), state.getLog().getNextIndex(), lastIncludedIndex); - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - // update the committed index - // re-load the state machine if this is the last chunk - if (request.getDone()) { - state.reloadStateMachine(lastIncludedIndex, leaderTerm); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(false); - } - } - if (request.getDone()) { - LOG.info("{}: successfully install the whole snapshot-{}", getId(), - lastIncludedIndex); - } - return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), - currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS); - } - - AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm, - String targetId, TermIndex previous, List<LogEntryProto> entries, - boolean initializing) { - return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, - leaderTerm, entries, state.getLog().getLastCommittedIndex(), - initializing, previous); - } - - synchronized InstallSnapshotRequestProto createInstallSnapshotRequest( - String targetId, String requestId, int requestIndex, SnapshotInfo snapshot, - List<FileChunkProto> chunks, boolean done) { - OptionalLong totalSize = snapshot.getFiles().stream() - .mapToLong(FileInfo::getFileSize).reduce(Long::sum); - assert totalSize.isPresent(); - return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId, - requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(), - chunks, totalSize.getAsLong(), done); - } - - synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId, - long term, TermIndex lastEntry) { - return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term, - lastEntry); - } - - public synchronized void submitLocalSyncEvent() { - if (isLeader() && leaderState != null) { - leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT); - } - } - - synchronized void replyPendingRequest(long logIndex, - CompletableFuture<Message> message) { - if (isLeader() && leaderState != null) { // is leader and is running - leaderState.replyPendingRequest(logIndex, message); - } - } - - TransactionContext getTransactionContext(long index) { - if (leaderState != null) { // is leader and is running - return leaderState.getTransactionContext(index); - } - return null; - } - - public RaftProperties getProperties() { - return this.properties; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java deleted file mode 100644 index e30b979..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.client.impl.ClientProtoUtils; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.util.ProtoUtils; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM; -import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; - -/** Server proto utilities for internal use. */ -public class ServerProtoUtils { - public static TermIndex toTermIndex(TermIndexProto p) { - return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex()); - } - - public static TermIndexProto toTermIndexProto(TermIndex ti) { - return ti == null? null: TermIndexProto.newBuilder() - .setTerm(ti.getTerm()) - .setIndex(ti.getIndex()) - .build(); - } - - public static TermIndex toTermIndex(LogEntryProto entry) { - return entry == null ? null : - TermIndex.newTermIndex(entry.getTerm(), entry.getIndex()); - } - - public static String toString(LogEntryProto... entries) { - return entries == null? "null" - : entries.length == 0 ? "[]" - : entries.length == 1? "" + toTermIndex(entries[0]) - : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex) - .collect(Collectors.toList()); - } - - public static String toString(AppendEntriesReplyProto reply) { - return toString(reply.getServerReply()) + "," + reply.getResult() - + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm(); - } - - private static String toString(RaftRpcReplyProto reply) { - return reply.getRequestorId() + "->" + reply.getReplyId() + "," - + reply.getSuccess(); - } - - public static RaftConfigurationProto toRaftConfigurationProto( - RaftConfiguration conf) { - return RaftConfigurationProto.newBuilder() - .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf())) - .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf())) - .build(); - } - - public static RaftConfiguration toRaftConfiguration( - long index, RaftConfigurationProto proto) { - final RaftConfiguration.Builder b = RaftConfiguration.newBuilder() - .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList())) - .setLogEntryIndex(index); - if (proto.getOldPeersCount() > 0) { - b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList())); - } - return b.build(); - } - - public static LogEntryProto toLogEntryProto( - RaftConfiguration conf, long term, long index) { - return LogEntryProto.newBuilder() - .setTerm(term) - .setIndex(index) - .setConfigurationEntry(toRaftConfigurationProto(conf)) - .build(); - } - - public static RequestVoteReplyProto toRequestVoteReplyProto( - String requestorId, String replyId, boolean success, long term, - boolean shouldShutdown) { - final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder(); - b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, replyId, - DEFAULT_SEQNUM, success)) - .setTerm(term) - .setShouldShutdown(shouldShutdown); - return b.build(); - } - - public static RequestVoteRequestProto toRequestVoteRequestProto( - String requestorId, String replyId, long term, TermIndex lastEntry) { - final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) - .setCandidateTerm(term); - if (lastEntry != null) { - b.setCandidateLastEntry(toTermIndexProto(lastEntry)); - } - return b.build(); - } - - public static InstallSnapshotReplyProto toInstallSnapshotReplyProto( - String requestorId, String replyId, long term, int requestIndex, - InstallSnapshotResult result) { - final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, - replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS); - final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto - .newBuilder().setServerReply(rb).setTerm(term).setResult(result) - .setRequestIndex(requestIndex); - return builder.build(); - } - - public static InstallSnapshotRequestProto toInstallSnapshotRequestProto( - String requestorId, String replyId, String requestId, int requestIndex, - long term, TermIndex lastTermIndex, List<FileChunkProto> chunks, - long totalSize, boolean done) { - return InstallSnapshotRequestProto.newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) - .setRequestId(requestId) - .setRequestIndex(requestIndex) - // .setRaftConfiguration() TODO: save and pass RaftConfiguration - .setLeaderTerm(term) - .setTermIndex(toTermIndexProto(lastTermIndex)) - .addAllFileChunks(chunks) - .setTotalSize(totalSize) - .setDone(done).build(); - } - - public static AppendEntriesReplyProto toAppendEntriesReplyProto( - String requestorId, String replyId, long term, - long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) { - RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, - replyId, DEFAULT_SEQNUM, appendResult == SUCCESS); - final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder(); - b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex) - .setResult(appendResult); - return b.build(); - } - - public static AppendEntriesRequestProto toAppendEntriesRequestProto( - String requestorId, String replyId, long leaderTerm, - List<LogEntryProto> entries, long leaderCommit, boolean initializing, - TermIndex previous) { - final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto - .newBuilder() - .setServerRequest( - ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM)) - .setLeaderTerm(leaderTerm) - .setLeaderCommit(leaderCommit) - .setInitializing(initializing); - if (entries != null && !entries.isEmpty()) { - b.addAllEntries(entries); - } - - if (previous != null) { - b.setPreviousLog(toTermIndexProto(previous)); - } - return b.build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java deleted file mode 100644 index 8611101..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.server.protocol.TermIndex; -import org.apache.raft.server.storage.*; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.ProtoUtils; - -import java.io.Closeable; -import java.io.IOException; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY; - -/** - * Common states of a raft peer. Protected by RaftServer's lock. - */ -public class ServerState implements Closeable { - private final String selfId; - private final RaftServerImpl server; - /** Raft log */ - private final RaftLog log; - /** Raft configuration */ - private final ConfigurationManager configurationManager; - /** The thread that applies committed log entries to the state machine */ - private final StateMachineUpdater stateMachineUpdater; - /** local storage for log and snapshot */ - private final RaftStorage storage; - private final SnapshotManager snapshotManager; - - /** - * Latest term server has seen. initialized to 0 on first boot, increases - * monotonically. - */ - private long currentTerm; - /** - * The server ID of the leader for this term. Null means either there is - * no leader for this term yet or this server does not know who it is yet. - */ - private String leaderId; - /** - * Candidate that this peer granted vote for in current term (or null if none) - */ - private String votedFor; - - /** - * Latest installed snapshot for this server. This maybe different than StateMachine's latest - * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. - * Further, this will not get updated when SM does snapshots itself. - */ - private TermIndex latestInstalledSnapshot; - - ServerState(String id, RaftConfiguration conf, RaftProperties prop, - RaftServerImpl server, StateMachine stateMachine) throws IOException { - this.selfId = id; - this.server = server; - configurationManager = new ConfigurationManager(conf); - storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR); - snapshotManager = new SnapshotManager(storage, id); - - long lastApplied = initStatemachine(stateMachine, prop); - - leaderId = null; - log = initLog(id, prop, server, lastApplied); - RaftLog.Metadata metadata = log.loadMetadata(); - currentTerm = metadata.getTerm(); - votedFor = metadata.getVotedFor(); - - stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log, - lastApplied, prop); - } - - /** - * Used by tests to set initial raft configuration with correct port bindings. - */ - @VisibleForTesting - public void setInitialConf(RaftConfiguration initialConf) { - configurationManager.setInitialConf(initialConf); - } - - private long initStatemachine(StateMachine sm, RaftProperties properties) - throws IOException { - sm.initialize(selfId, properties, storage); - storage.setStateMachineStorage(sm.getStateMachineStorage()); - SnapshotInfo snapshot = sm.getLatestSnapshot(); - - if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) { - return RaftServerConstants.INVALID_LOG_INDEX; - } - - // get the raft configuration from the snapshot - RaftConfiguration raftConf = sm.getRaftConfiguration(); - if (raftConf != null) { - configurationManager.addConfiguration(raftConf.getLogEntryIndex(), - raftConf); - } - return snapshot.getIndex(); - } - - void start() { - stateMachineUpdater.start(); - } - - /** - * note we do not apply log entries to the state machine here since we do not - * know whether they have been committed. - */ - private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server, - long lastIndexInSnapshot) throws IOException { - final RaftLog log; - if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY, - RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) { - log = new MemoryRaftLog(id); - } else { - log = new SegmentedRaftLog(id, server, this.storage, - lastIndexInSnapshot, prop); - } - log.open(configurationManager, lastIndexInSnapshot); - return log; - } - - public RaftConfiguration getRaftConf() { - return configurationManager.getCurrent(); - } - - @VisibleForTesting - - public String getSelfId() { - return this.selfId; - } - - public long getCurrentTerm() { - return currentTerm; - } - - void setCurrentTerm(long term) { - currentTerm = term; - } - - String getLeaderId() { - return leaderId; - } - - boolean hasLeader() { - return leaderId != null; - } - - /** - * Become a candidate and start leader election - */ - long initElection() { - votedFor = selfId; - leaderId = null; - return ++currentTerm; - } - - void persistMetadata() throws IOException { - this.log.writeMetadata(currentTerm, votedFor); - } - - void resetLeaderAndVotedFor() { - votedFor = null; - leaderId = null; - } - - /** - * Vote for a candidate and update the local state. - */ - void grantVote(String candidateId) { - votedFor = candidateId; - leaderId = null; - } - - void setLeader(String leaderId) { - this.leaderId = leaderId; - } - - void becomeLeader() { - leaderId = selfId; - } - - public RaftLog getLog() { - return log; - } - - long applyLog(TransactionContext operation) throws IOException { - return log.append(currentTerm, operation); - } - - /** - * Check if accept the leader selfId and term from the incoming AppendEntries rpc. - * If accept, update the current state. - * @return true if the check passes - */ - boolean recognizeLeader(String leaderId, long leaderTerm) { - if (leaderTerm < currentTerm) { - return false; - } else if (leaderTerm > currentTerm || this.leaderId == null) { - // If the request indicates a term that is greater than the current term - // or no leader has been set for the current term, make sure to update - // leader and term later - return true; - } - Preconditions.checkArgument(this.leaderId.equals(leaderId), - "selfId:%s, this.leaderId:%s, received leaderId:%s", - selfId, this.leaderId, leaderId); - return true; - } - - /** - * Check if the candidate's term is acceptable - */ - boolean recognizeCandidate(String candidateId, - long candidateTerm) { - if (candidateTerm > currentTerm) { - return true; - } else if (candidateTerm == currentTerm) { - // has not voted yet or this is a retry - return votedFor == null || votedFor.equals(candidateId); - } - return false; - } - - boolean isLogUpToDate(TermIndex candidateLastEntry) { - LogEntryProto lastEntry = log.getLastEntry(); - // need to take into account snapshot - SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); - if (lastEntry == null && snapshot == null) { - return true; - } else if (candidateLastEntry == null) { - return false; - } - TermIndex local = ServerProtoUtils.toTermIndex(lastEntry); - if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) { - local = snapshot.getTermIndex(); - } - return local.compareTo(candidateLastEntry) <= 0; - } - - @Override - public String toString() { - return selfId + ":t" + currentTerm + ", leader=" + leaderId - + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf(); - } - - boolean isConfCommitted() { - return getLog().getLastCommittedIndex() >= - getRaftConf().getLogEntryIndex(); - } - - public void setRaftConf(long logIndex, RaftConfiguration conf) { - configurationManager.addConfiguration(logIndex, conf); - RaftServerImpl.LOG.info("{}: successfully update the configuration {}", - getSelfId(), conf); - } - - void updateConfiguration(LogEntryProto[] entries) { - if (entries != null && entries.length > 0) { - configurationManager.removeConfigurations(entries[0].getIndex()); - for (LogEntryProto entry : entries) { - if (ProtoUtils.isConfigurationLogEntry(entry)) { - final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration( - entry.getIndex(), entry.getConfigurationEntry()); - configurationManager.addConfiguration(entry.getIndex(), conf); - server.getServerRpc().addPeers(conf.getPeers()); - } - } - } - } - - void updateStatemachine(long majorityIndex, long currentTerm) { - log.updateLastCommitted(majorityIndex, currentTerm); - stateMachineUpdater.notifyUpdater(); - } - - void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) - throws IOException { - log.updateLastCommitted(lastIndexInSnapshot, currentTerm); - - stateMachineUpdater.reloadStateMachine(); - } - - @Override - public void close() throws IOException { - stateMachineUpdater.stop(); - RaftServerImpl.LOG.info("{} closes. The last applied log index is {}", - getSelfId(), getLastAppliedIndex()); - storage.close(); - } - - @VisibleForTesting - public RaftStorage getStorage() { - return storage; - } - - void installSnapshot(InstallSnapshotRequestProto request) throws IOException { - // TODO: verify that we need to install the snapshot - StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(sm, request); - log.syncWithSnapshot(request.getTermIndex().getIndex()); - this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex( - request.getTermIndex()); - } - - SnapshotInfo getLatestSnapshot() { - return server.getStateMachine().getStateMachineStorage().getLatestSnapshot(); - } - - public TermIndex getLatestInstalledSnapshot() { - return latestInstalledSnapshot; - } - - @VisibleForTesting - public long getLastAppliedIndex() { - return stateMachineUpdater.getLastAppliedIndex(); - } - - boolean isCurrentConfCommitted() { - return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java deleted file mode 100644 index ac21386..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.raft.server.protocol.TermIndex; - -/** Server utilities for internal use. */ -public class ServerUtils { - public static TermIndex newTermIndex(long term, long index) { - return new TermIndexImpl(term, index); - } - - private static class TermIndexImpl implements TermIndex { - private final long term; - private final long index; //log index; first index is 1. - - TermIndexImpl(long term, long logIndex) { - this.term = term; - this.index = logIndex; - } - - @Override - public long getTerm() { - return term; - } - - @Override - public long getIndex() { - return index; - } - - @Override - public int compareTo(TermIndex that) { - final int d = Long.compare(this.getTerm(), that.getTerm()); - return d != 0 ? d : Long.compare(this.getIndex(), that.getIndex()); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (obj == null || !(obj instanceof TermIndexImpl)) { - return false; - } - - final TermIndexImpl that = (TermIndexImpl) obj; - return this.getTerm() == that.getTerm() - && this.getIndex() == that.getIndex(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(term).append(index).hashCode(); - } - - private static String toString(long n) { - return n < 0 ? "~" : "" + n; - } - - @Override - public String toString() { - return "(t:" + toString(term) + ", i:" + toString(index) + ")"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java deleted file mode 100644 index f85639b..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.protocol.Message; -import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.server.storage.RaftStorage; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.Daemon; -import org.apache.raft.util.ExitUtils; -import org.apache.raft.util.LifeCycle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; -import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; - -/** - * This class tracks the log entries that have been committed in a quorum and - * applies them to the state machine. We let a separate thread do this work - * asynchronously so that this will not block normal raft protocol. - * - * If the auto log compaction is enabled, the state machine updater thread will - * trigger a snapshot of the state machine by calling - * {@link StateMachine#takeSnapshot} when the log size exceeds a limit. - */ -class StateMachineUpdater implements Runnable { - static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); - - enum State { - RUNNING, STOP, RELOAD - } - - private final RaftProperties properties; - private final StateMachine stateMachine; - private final RaftServerImpl server; - private final RaftLog raftLog; - - private volatile long lastAppliedIndex; - - private final boolean autoSnapshotEnabled; - private final long snapshotThreshold; - private long lastSnapshotIndex; - - private final Thread updater; - private volatile State state = State.RUNNING; - - StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server, - RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) { - this.properties = properties; - this.stateMachine = stateMachine; - this.server = server; - this.raftLog = raftLog; - - this.lastAppliedIndex = lastAppliedIndex; - lastSnapshotIndex = lastAppliedIndex; - - autoSnapshotEnabled = properties.getBoolean( - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY, - RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT); - snapshotThreshold = properties.getLong( - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY, - RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT); - updater = new Daemon(this); - } - - void start() { - updater.start(); - } - - void stop() { - state = State.STOP; - updater.interrupt(); - try { - stateMachine.close(); - } catch (IOException ignored) { - } - } - - void reloadStateMachine() { - state = State.RELOAD; - notifyUpdater(); - } - - synchronized void notifyUpdater() { - notifyAll(); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "-" + raftLog.getSelfId(); - } - - @Override - public void run() { - final RaftStorage storage = server.getState().getStorage(); - while (isRunning()) { - try { - synchronized (this) { - // when the peers just start, the committedIndex is initialized as 0 - // and will be updated only after the leader contacts other peers. - // Thus initially lastAppliedIndex can be greater than lastCommitted. - while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { - wait(); - } - } - - final long committedIndex = raftLog.getLastCommittedIndex(); - Preconditions.checkState(lastAppliedIndex < committedIndex); - - if (state == State.RELOAD) { - Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED); - - stateMachine.reinitialize(server.getId(), properties, storage); - - SnapshotInfo snapshot = stateMachine.getLatestSnapshot(); - Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex, - "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex); - - lastAppliedIndex = snapshot.getIndex(); - lastSnapshotIndex = snapshot.getIndex(); - state = State.RUNNING; - } - - while (lastAppliedIndex < committedIndex) { - final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); - if (next != null) { - if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) { - // the reply should have already been set. only need to record - // the new conf in the state machine. - stateMachine.setRaftConfiguration( - ServerProtoUtils.toRaftConfiguration(next.getIndex(), - next.getConfigurationEntry())); - } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { - // check whether there is a TransactionContext because we are the leader. - TransactionContext trx = server.getTransactionContext(next.getIndex()); - if (trx == null) { - trx = new TransactionContext(stateMachine, next); - } - - // Let the StateMachine inject logic for committed transactions in sequential order. - trx = stateMachine.applyTransactionSerial(trx); - - // TODO: This step can be parallelized - CompletableFuture<Message> messageFuture = - stateMachine.applyTransaction(trx); - server.replyPendingRequest(next.getIndex(), messageFuture); - } - lastAppliedIndex++; - } else { - LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", - this, lastAppliedIndex + 1, state); - break; - } - } - - // check if need to trigger a snapshot - if (shouldTakeSnapshot(lastAppliedIndex)) { - stateMachine.takeSnapshot(); - // TODO purge logs, including log cache. but should keep log for leader's RPCSenders - lastSnapshotIndex = lastAppliedIndex; - } - } catch (InterruptedException e) { - if (!isRunning()) { - LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); - } else { - final String s = this + ": the StateMachineUpdater is wrongly interrupted"; - ExitUtils.terminate(1, s, e, LOG); - } - } catch (Throwable t) { - final String s = this + ": the StateMachineUpdater hits Throwable"; - ExitUtils.terminate(2, s, t, LOG); - } - } - } - - private boolean isRunning() { - return state != State.STOP; - } - - private boolean shouldTakeSnapshot(long currentAppliedIndex) { - return autoSnapshotEnabled && (state != State.RELOAD) && - (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold); - } - - long getLastAppliedIndex() { - return lastAppliedIndex; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java b/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java deleted file mode 100644 index 59e9bba..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.protocol; - -import org.apache.raft.shaded.proto.RaftProtos.*; - -import java.io.IOException; - -public interface RaftServerProtocol { - - RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException; - - AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException; - - InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java b/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java deleted file mode 100644 index df401d6..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.protocol; - -import org.apache.raft.server.impl.ServerUtils; - -/** The term and the log index defined in the Raft consensus algorithm. */ -public interface TermIndex extends Comparable<TermIndex> { - /** @return the term. */ - long getTerm(); - - /** @return the index. */ - long getIndex(); - - /** Create a new {@link TermIndex} instance. */ - static TermIndex newTermIndex(long term, long index) { - return ServerUtils.newTermIndex(term, index); - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java deleted file mode 100644 index 4440be9..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.channels.FileChannel; - -public abstract class BufferedChannelBase implements Closeable { - protected final FileChannel fileChannel; - - protected BufferedChannelBase(FileChannel fc) { - this.fileChannel = fc; - } - - protected FileChannel validateAndGetFileChannel() throws IOException { - if (!fileChannel.isOpen()) { - throw new IOException( - "Attempting to access a file channel that has already been closed"); - } - return fileChannel; - } - - /** - * Get the current size of the underlying FileChannel. - */ - public long size() throws IOException { - return validateAndGetFileChannel().size(); - } - - /** - * Get the {@link FileChannel} that this BufferedChannel wraps around. - */ - public FileChannel getFileChannel() { - return fileChannel; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java deleted file mode 100644 index 6c662d1..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Provides a buffering layer in front of a FileChannel for writing. - */ -public class BufferedWriteChannel extends BufferedChannelBase { - // The capacity of the write buffer. - private final int writeCapacity; - // The position of the file channel's write pointer. - private AtomicLong writeBufferStartPosition = new AtomicLong(0); - // The buffer used to write operations. - private final ByteBuffer writeBuffer; - // The absolute position of the next write operation. - private volatile long position; - - public BufferedWriteChannel(FileChannel fc, int writeCapacity) - throws IOException { - super(fc); - this.writeCapacity = writeCapacity; - this.position = fc.position(); - this.writeBufferStartPosition.set(position); - this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity); - } - - /** - * Write all the data in src to the {@link FileChannel}. Note that this function can - * buffer or re-order writes based on the implementation. These writes will be flushed - * to the disk only when flush() is invoked. - * - * @param src The source ByteBuffer which contains the data to be written. - * @throws IOException if a write operation fails. - */ - public void write(ByteBuffer src) throws IOException { - int copied = 0; - while (src.remaining() > 0) { - int truncated = 0; - if (writeBuffer.remaining() < src.remaining()) { - truncated = src.remaining() - writeBuffer.remaining(); - src.limit(src.limit() - truncated); - } - copied += src.remaining(); - writeBuffer.put(src); - src.limit(src.limit() + truncated); - // if we have run out of buffer space, we should flush to the file - if (writeBuffer.remaining() == 0) { - flushInternal(); - } - } - position += copied; - } - - /** - * Write the specified byte. - * @param b the byte to be written - */ - public void write(int b) throws IOException { - writeBuffer.put((byte) b); - if (writeBuffer.remaining() == 0) { - flushInternal(); - } - position++; - } - - public void write(byte[] b) throws IOException { - int offset = 0; - while (offset < b.length) { - int toPut = Math.min(b.length - offset, writeBuffer.remaining()); - writeBuffer.put(b, offset, toPut); - offset += toPut; - if (writeBuffer.remaining() == 0) { - flushInternal(); - } - } - position += b.length; - } - - /** - * Get the position where the next write operation will begin writing from. - */ - public long position() { - return position; - } - - /** - * Get the position of the file channel's write pointer. - */ - public long getFileChannelPosition() { - return writeBufferStartPosition.get(); - } - - - /** - * Write any data in the buffer to the file. If sync is set to true, force a - * sync operation so that data is persisted to the disk. - * - * @throws IOException if the write or sync operation fails. - */ - public void flush(boolean shouldForceWrite) throws IOException { - synchronized (this) { - flushInternal(); - } - if (shouldForceWrite) { - forceWrite(false); - } - } - - /** - * Write any data in the buffer to the file and advance the writeBufferPosition - * Callers are expected to synchronize appropriately - * - * @throws IOException if the write fails. - */ - private void flushInternal() throws IOException { - writeBuffer.flip(); - do { - fileChannel.write(writeBuffer); - } while (writeBuffer.hasRemaining()); - writeBuffer.clear(); - writeBufferStartPosition.set(fileChannel.position()); - } - - public long forceWrite(boolean forceMetadata) throws IOException { - // This is the point up to which we had flushed to the file system page cache - // before issuing this force write hence is guaranteed to be made durable by - // the force write, any flush that happens after this may or may - // not be flushed - long positionForceWrite = writeBufferStartPosition.get(); - fileChannel.force(forceMetadata); - return positionForceWrite; - } - - @Override - public void close() throws IOException { - fileChannel.close(); - writeBuffer.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java b/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java deleted file mode 100644 index cdeb622..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import org.apache.raft.io.MD5Hash; - -import java.nio.file.Path; - -/** - * Metadata about a file. - * - * The objects of this class are immutable. - */ -public class FileInfo { - private final Path path; - private final MD5Hash fileDigest; - private final long fileSize; - - public FileInfo(Path path, MD5Hash fileDigest) { - this.path = path; - this.fileDigest = fileDigest; - this.fileSize = path.toFile().length(); - } - - @Override - public String toString() { - return path.toString(); - } - - /** @return the path of the file. */ - public Path getPath() { - return path; - } - - /** @return the MD5 file digest of the file. */ - public MD5Hash getFileDigest() { - return fileDigest; - } - - /** @return the size of the file. */ - public long getFileSize() { - return fileSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java deleted file mode 100644 index 95597b2..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.storage; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; - -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -public class LogInputStream implements Closeable { - static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class); - - static class LogValidation { - private final long validLength; - private final long endIndex; - private final boolean hasCorruptHeader; - - LogValidation(long validLength, long endIndex, boolean hasCorruptHeader) { - this.validLength = validLength; - this.endIndex = endIndex; - this.hasCorruptHeader = hasCorruptHeader; - } - - long getValidLength() { - return validLength; - } - - long getEndIndex() { - return endIndex; - } - - boolean hasCorruptHeader() { - return hasCorruptHeader; - } - } - - private enum State { - UNINIT, - OPEN, - CLOSED - } - - private final File logFile; - private final long startIndex; - private final long endIndex; - private final boolean isOpen; - private State state = State.UNINIT; - private LogReader reader; - - public LogInputStream(File log, long startIndex, long endIndex, - boolean isOpen) { - if (isOpen) { - Preconditions.checkArgument(endIndex == INVALID_LOG_INDEX); - } else { - Preconditions.checkArgument(endIndex >= startIndex); - } - - this.logFile = log; - this.startIndex = startIndex; - this.endIndex = endIndex; - this.isOpen = isOpen; - } - - private void init() throws IOException { - Preconditions.checkState(state == State.UNINIT); - try { - reader = new LogReader(logFile); - // read the log header - String header = reader.readLogHeader(); - Preconditions.checkState(SegmentedRaftLog.HEADER_STR.equals(header), - "Corrupted log header: %s", header); - state = State.OPEN; - } finally { - if (reader == null) { - state = State.CLOSED; - } - } - } - - long getStartIndex() { - return startIndex; - } - - long getEndIndex() { - return endIndex; - } - - String getName() { - return logFile.getName(); - } - - public LogEntryProto nextEntry() throws IOException { - LogEntryProto entry = null; - switch (state) { - case UNINIT: - try { - init(); - } catch (Throwable e) { - LOG.error("caught exception initializing " + this, e); - Throwables.propagateIfPossible(e, IOException.class); - } - Preconditions.checkState(state != State.UNINIT); - return nextEntry(); - case OPEN: - entry = reader.readEntry(); - if (entry != null) { - long index = entry.getIndex(); - if (!isOpen() && index >= endIndex) { - /** - * The end index may be derived from the segment recovery - * process. It is possible that we still have some uncleaned garbage - * in the end. We should skip them. - */ - long skipAmt = logFile.length() - reader.getPos(); - if (skipAmt > 0) { - LOG.debug("skipping {} bytes at the end of log '{}': reached" + - " entry {} out of {}", skipAmt, getName(), index, endIndex); - reader.skipFully(skipAmt); - } - } - } - break; - case CLOSED: - break; // return null - } - return entry; - } - - long scanNextEntry() throws IOException { - Preconditions.checkState(state == State.OPEN); - return reader.scanEntry(); - } - - long getPosition() { - if (state == State.OPEN) { - return reader.getPos(); - } else { - return 0; - } - } - - @Override - public void close() throws IOException { - if (state == State.OPEN) { - reader.close(); - } - state = State.CLOSED; - } - - boolean isOpen() { - return isOpen; - } - - @Override - public String toString() { - return getName(); - } - - /** - * @param file File being scanned and validated. - * @param maxTxIdToScan Maximum Tx ID to try to scan. - * The scan returns after reading this or a higher - * ID. The file portion beyond this ID is - * potentially being updated. - * @return Result of the validation - * @throws IOException - */ - static LogValidation scanEditLog(File file, long maxTxIdToScan) - throws IOException { - LogInputStream in; - try { - in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false); - // read the header, initialize the inputstream - in.init(); - } catch (EOFException e) { - LOG.warn("Log file " + file + " has no valid header", e); - return new LogValidation(0, INVALID_LOG_INDEX, true); - } - - try { - return scanEditLog(in, maxTxIdToScan); - } finally { - RaftUtils.cleanup(LOG, in); - } - } - - /** - * Find the last valid entry index in the stream. - * If there are invalid or corrupt entries in the middle of the stream, - * scanEditLog will skip over them. - * - * This reads through the stream but does not close it. - * - * @param maxIndexToScan Maximum entry index to try to scan. The scan returns - * after reading this or a higher index. The file - * portion beyond this index is potentially being - * updated. - */ - static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) { - long lastPos = 0; - long end = INVALID_LOG_INDEX; - long numValid = 0; - boolean hitError = false; - while (end < maxIndexToScan) { - long index; - lastPos = in.getPosition(); - try { - if (hitError) { - LogEntryProto entry = in.nextEntry(); - index = entry != null ? entry.getIndex() : INVALID_LOG_INDEX; - LOG.warn("After resync, position is " + in.getPosition()); - } else { - index = in.scanNextEntry(); - } - if (index == INVALID_LOG_INDEX) { - break; - } else { - hitError = false; - } - } catch (Throwable t) { - LOG.warn("Caught exception after scanning through {} ops from {}" - + " while determining its valid length. Position was " - + lastPos, numValid, in, t); - hitError = true; - continue; - } - if (end == INVALID_LOG_INDEX || index > end) { - end = index; - } - numValid++; - } - return new LogValidation(lastPos, end, false); - } -}
