Repository: incubator-ratis Updated Branches: refs/heads/master 609773e03 -> 9296a3bc0
RATIS-443. FollowerState.inLogSync can be incorrectly cleared in appendEntriesAsync. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9296a3bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9296a3bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9296a3bc Branch: refs/heads/master Commit: 9296a3bc0b79cf73a6576e0b4c0b981d1ea3b221 Parents: 609773e Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sun Dec 2 16:57:20 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sun Dec 2 16:57:20 2018 -0800 ---------------------------------------------------------------------- ratis-proto/src/main/proto/Raft.proto | 2 +- .../apache/ratis/server/impl/FollowerState.java | 42 ++++-- .../ratis/server/impl/LeaderElection.java | 2 +- .../apache/ratis/server/impl/LeaderState.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 135 +++++++++++-------- 5 files changed, 114 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 83d4394..103c478 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -316,7 +316,7 @@ message LeaderInfoProto { message FollowerInfoProto { ServerRpcProto leaderInfo = 1; - bool inLogSync = 2; + uint32 outstandingOp = 2; } message CandidateInfoProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index f526091..903ab5e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,35 +22,59 @@ import org.apache.ratis.util.Timestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.ToIntFunction; + /** * Used when the peer is a follower. Used to track the election timeout. */ class FollowerState extends Daemon { + enum UpdateType { + APPEND_START(AtomicInteger::incrementAndGet), + APPEND_COMPLETE(AtomicInteger::decrementAndGet), + INSTALL_SNAPSHOT_START(AtomicInteger::incrementAndGet), + INSTALL_SNAPSHOT_COMPLETE(AtomicInteger::decrementAndGet), + REQUEST_VOTE(AtomicInteger::get); + + private final ToIntFunction<AtomicInteger> updateFunction; + + UpdateType(ToIntFunction<AtomicInteger> updateFunction) { + this.updateFunction = updateFunction; + } + + int update(AtomicInteger outstanding) { + return updateFunction.applyAsInt(outstanding); + } + } + static final Logger LOG = LoggerFactory.getLogger(FollowerState.class); private final RaftServerImpl server; private volatile Timestamp lastRpcTime = new Timestamp(); private volatile boolean monitorRunning = true; - private volatile boolean inLogSync = false; + private final AtomicInteger outstandingOp = new AtomicInteger(); FollowerState(RaftServerImpl server) { this.server = server; } - void updateLastRpcTime(boolean inLogSync) { + void updateLastRpcTime(UpdateType type) { lastRpcTime = new Timestamp(); - LOG.trace("{} update last rpc time to {} {}", server.getId(), - lastRpcTime, inLogSync); - this.inLogSync = inLogSync; + + final int n = type.update(outstandingOp); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: update lastRpcTime to {} for {}, outstandingOp={}", + server.getId(), lastRpcTime, type, n); + } } Timestamp getLastRpcTime() { return lastRpcTime; } - public boolean isInLogSync() { - return inLogSync; + int getOutstandingOp() { + return outstandingOp.get(); } boolean shouldWithholdVotes() { @@ -72,7 +96,7 @@ class FollowerState extends Daemon { break; } synchronized (server) { - if (!inLogSync && lastRpcTime.elapsedTimeMs() >= electionTimeout) { + if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) { LOG.info("{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms", server.getId(), lastRpcTime.elapsedTimeMs(), electionTimeout); // election timeout, should become a candidate http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index d62b1a7..5cdc8a9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -170,7 +170,7 @@ class LeaderElection extends Daemon { case DISCOVERED_A_NEW_TERM: final long term = r.term > server.getState().getCurrentTerm() ? r.term : server.getState().getCurrentTerm(); - server.changeToFollowerAndPersistMetadata(term); + server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM); return; case TIMEOUT: // should start another election http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 032c3a9..1bc6e79 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -386,7 +386,7 @@ public class LeaderState { private void stepDown(long term) { try { - server.changeToFollowerAndPersistMetadata(term); + server.changeToFollowerAndPersistMetadata(term, "stepDown"); } catch(IOException e) { final String s = server.getId() + ": Failed to persist metadata for term " + term; LOG.warn(s, e); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9296a3bc/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3c42fcd..4ea78ce 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -157,9 +157,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return proxy.getServerRpc(); } - private void setRole(RaftPeerRole newRole, String op) { + private void setRole(RaftPeerRole newRole, Object reason) { LOG.info("{} changes role from {} to {} at term {} for {}", - getId(), this.role, newRole, state.getCurrentTerm(), op); + getId(), this.role, newRole, state.getCurrentTerm(), reason); this.role.transitionRole(newRole); } @@ -286,29 +286,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } /** - * Change the server state to Follower if necessary + * Change the server state to Follower if this server is in a different role or force is true. * @param newTerm The new term. + * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term - * @throws IOException if term/votedFor persistence failed. */ - private synchronized boolean changeToFollower(long newTerm) { + private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) { final RaftPeerRole old = role.getCurrentRole(); final boolean metadataUpdated = state.updateCurrentTerm(newTerm); - if (old != RaftPeerRole.FOLLOWER) { - setRole(RaftPeerRole.FOLLOWER, "changeToFollower"); + if (old != RaftPeerRole.FOLLOWER || force) { + setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { role.shutdownLeaderState(false); } else if (old == RaftPeerRole.CANDIDATE) { role.shutdownLeaderElection(); + } else if (old == RaftPeerRole.FOLLOWER) { + role.shutdownFollowerState(); } role.startFollowerState(this); } return metadataUpdated; } - synchronized void changeToFollowerAndPersistMetadata(long newTerm) throws IOException { - if (changeToFollower(newTerm)) { + synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException { + if (changeToFollower(newTerm, false, reason)) { state.persistMetadata(); } } @@ -368,7 +370,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou getRaftConf().getPeer(state.getLeaderId()), fs.getLastRpcTime().elapsedTimeMs()); roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder() .setLeaderInfo(leaderInfo) - .setInLogSync(fs.isInLogSync())); + .setOutstandingOp(fs.getOutstandingOp())); }); break; @@ -734,10 +736,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(), fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null); } else if (state.recognizeCandidate(candidateId, candidateTerm)) { - final boolean termUpdated = changeToFollower(candidateTerm); + final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId); // see Section 5.4.1 Election restriction if (state.isLogUpToDate(candidateLastEntry) && fs != null) { - fs.updateLastRpcTime(false); + fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE); state.grantVote(candidateId); voteGranted = true; } @@ -806,10 +808,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou .toArray(new LogEntryProto[r.getEntriesCount()]); final TermIndex previous = r.hasPreviousLog() ? ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()), - ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), - previous, r.getLeaderCommit(), request.getCallId(), r.getInitializing(), - r.getCommitInfosList(), entries); + final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId()); + + preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), + previous, r.getLeaderCommit(), r.getInitializing(), entries); + try { + return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(), + request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries); + } catch(Throwable t) { + LOG.error(getId() + ": Failed appendEntriesAsync " + r, t); + throw t; + } } static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) { @@ -824,24 +833,20 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } } - private void updateLastRpcTime(boolean inLogSync) { - if (lifeCycle.getCurrentState() == RUNNING) { - role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(inLogSync)); + private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) { + final Optional<FollowerState> fs = role.getFollowerState(); + if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) { + fs.get().updateLastRpcTime(updateType); + return fs; + } else { + return Optional.empty(); } } - private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( - RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, - TermIndex previous, long leaderCommit, long callId, boolean initializing, - List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException { + private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, + TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, leaderTerm, previous, leaderCommit, initializing, entries); - final boolean isHeartbeat = entries.length == 0; - logAppendEntries(isHeartbeat, - () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", " - + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing - + ", commits" + ProtoUtils.toString(commitInfos) - + ", entries: " + ServerProtoUtils.toString(entries)); final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING); if (currentState == STARTING) { @@ -856,12 +861,23 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } catch (IllegalArgumentException e) { throw new IOException(e); } + } + private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( + RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing, + List<CommitInfoProto> commitInfos, LogEntryProto... entries) { + final boolean isHeartbeat = entries.length == 0; + logAppendEntries(isHeartbeat, + () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", " + + previous + ", " + leaderCommit + ", " + initializing + + ", commits" + ProtoUtils.toString(commitInfos) + + ", entries: " + ServerProtoUtils.toString(entries)); final List<CompletableFuture<Long>> futures; final long currentTerm; final long nextIndex = state.getLog().getNextIndex(); final long followerCommit = state.getLog().getLastCommittedIndex(); + final Optional<FollowerState> followerState; synchronized (this) { final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -874,13 +890,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } return CompletableFuture.completedFuture(reply); } - changeToFollowerAndPersistMetadata(leaderTerm); + try { + changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries"); + } catch (IOException e) { + return JavaUtils.completeExceptionally(e); + } state.setLeader(leaderId, "appendEntries"); if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { role.startFollowerState(this); } - updateLastRpcTime(true); + followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); // We need to check if "previous" is in the local peer. Note that it is // possible that "previous" is covered by the latest snapshot: e.g., @@ -896,6 +916,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", getId(), previous, ServerProtoUtils.toString(reply)); } + followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); return CompletableFuture.completedFuture(reply); } @@ -908,14 +929,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (!isHeartbeat) { CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null); } - return JavaUtils.allOf(futures).thenApplyAsync(v -> { + return JavaUtils.allOf(futures).whenCompleteAsync( + (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)) + ).thenApply(v -> { final AppendEntriesReplyProto reply; synchronized(this) { - if (lifeCycle.getCurrentState() == RUNNING && isFollower() - && getState().getCurrentTerm() == currentTerm) { - // reset election timer to avoid punishing the leader for our own long disk writes - updateLastRpcTime(false); - } state.updateStatemachine(leaderCommit, currentTerm); final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1; reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm, @@ -957,6 +975,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex( request.getTermIndex()); final long lastIncludedIndex = lastTermIndex.getIndex(); + final Optional<FollowerState> followerState; synchronized (this) { final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); @@ -968,28 +987,30 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou " Reply: {}", getId(), reply); return reply; } - changeToFollowerAndPersistMetadata(leaderTerm); + changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); - updateLastRpcTime(true); - - // Check and append the snapshot chunk. We simply put this in lock - // considering a follower peer requiring a snapshot installation does not - // have a lot of requests - Preconditions.assertTrue( - state.getLog().getNextIndex() <= lastIncludedIndex, - "%s log's next id is %s, last included index in snapshot is %s", - getId(), state.getLog().getNextIndex(), lastIncludedIndex); - - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - // update the committed index - // re-load the state machine if this is the last chunk - if (request.getDone()) { - state.reloadStateMachine(lastIncludedIndex, leaderTerm); + followerState = updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); + try { + // Check and append the snapshot chunk. We simply put this in lock + // considering a follower peer requiring a snapshot installation does not + // have a lot of requests + Preconditions.assertTrue( + state.getLog().getNextIndex() <= lastIncludedIndex, + "%s log's next id is %s, last included index in snapshot is %s", + getId(), state.getLog().getNextIndex(), lastIncludedIndex); + + //TODO: We should only update State with installed snapshot once the request is done. + state.installSnapshot(request); + + // update the committed index + // re-load the state machine if this is the last chunk + if (request.getDone()) { + state.reloadStateMachine(lastIncludedIndex, leaderTerm); + } + } finally { + updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); } - updateLastRpcTime(false); } if (request.getDone()) { LOG.info("{}: successfully install the whole snapshot-{}", getId(),
