Repository: incubator-ratis Updated Branches: refs/heads/master b0dc99205 -> c3845bc3f
RATIS-341. Raft log index on the follower should be applied to state machine only after writing the log. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c3845bc3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c3845bc3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c3845bc3 Branch: refs/heads/master Commit: c3845bc3f3044f513347d8fec220e4e289a0ea5b Parents: b0dc992 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sun Oct 7 05:34:53 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sun Oct 7 05:34:53 2018 +0800 ---------------------------------------------------------------------- .../ratis/grpc/server/GrpcLogAppender.java | 2 +- ratis-proto/src/main/proto/Raft.proto | 1 + .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 48 +++++++++----------- .../ratis/server/impl/ServerProtoUtils.java | 6 ++- .../apache/ratis/server/storage/RaftLog.java | 5 +- 6 files changed, 32 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 66c9948..11ff4d8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -266,7 +266,7 @@ public class GrpcLogAppender extends LogAppender { LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply)); return; } - updateCommitIndex(request.getLeaderCommit()); + updateCommitIndex(reply.getFollowerCommit()); final long replyNextIndex = reply.getNextIndex(); final long lastIndex = replyNextIndex - 1; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 535914d..e0916fd 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -147,6 +147,7 @@ message AppendEntriesReplyProto { uint64 term = 2; uint64 nextIndex = 3; AppendResult result = 4; + uint64 followerCommit = 5; } message InstallSnapshotRequestProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 3c9b2d4..4dff3e5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -238,7 +238,7 @@ public class LogAppender { final AppendEntriesReplyProto r = server.getServerRpc().appendEntries(request); follower.updateLastRpcResponseTime(); - updateCommitIndex(request.getLeaderCommit()); + updateCommitIndex(r.getFollowerCommit()); return r; } catch (InterruptedIOException | RaftLogIOException e) { throw e; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d4b32a1..5e7bd89 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -895,12 +895,13 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final long currentTerm; long nextIndex = state.getLog().getNextIndex(); + long followerCommit = state.getLog().getLastCommittedIndex(); synchronized (this) { final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER, callId); + leaderId, getId(), groupId, currentTerm, followerCommit, nextIndex, NOT_LEADER, callId); if (LOG.isDebugEnabled()) { LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}", getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply)); @@ -924,9 +925,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // last index included in snapshot. This is because indices <= snapshot's // last index should have been committed. if (previous != null && !containPrevious(previous)) { - final AppendEntriesReplyProto reply = - ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, - currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY, callId); + final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( + leaderId, getId(), groupId, currentTerm, followerCommit, Math.min(nextIndex, previous.getIndex()), + INCONSISTENCY, callId); if (LOG.isDebugEnabled()) { LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", getId(), previous, ServerProtoUtils.toString(reply)); @@ -937,33 +938,28 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou futures = state.getLog().append(entries); state.updateConfiguration(entries); - state.updateStatemachine(leaderCommit, currentTerm); - commitInfos.stream().forEach(c -> commitInfoCache.update(c)); + commitInfos.forEach(commitInfoCache::update); } if (entries.length > 0) { CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null); - nextIndex = entries[entries.length - 1].getIndex() + 1; } - final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( - leaderId, getId(), groupId, currentTerm, nextIndex, SUCCESS, callId); - logAppendEntries(isHeartbeat, - () -> getId() + ": succeeded to handle AppendEntries. Reply: " - + ServerProtoUtils.toString(reply)); - return JavaUtils.allOf(futures) - .thenApply(v -> { - // reset election timer to avoid punishing the leader for our own - // long disk writes - synchronized (this) { - if (lifeCycle.getCurrentState() == RUNNING && isFollower() - && getState().getCurrentTerm() == currentTerm) { - // reset election timer to avoid punishing the leader for our own - // long disk writes - heartbeatMonitor.updateLastRpcTime(false); - } - } - return reply; - }); + return JavaUtils.allOf(futures).thenApply(v -> { + final AppendEntriesReplyProto reply; + synchronized(this) { + if (lifeCycle.getCurrentState() == RUNNING && isFollower() + && getState().getCurrentTerm() == currentTerm) { + // reset election timer to avoid punishing the leader for our own long disk writes + heartbeatMonitor.updateLastRpcTime(false); + } + state.updateStatemachine(leaderCommit, currentTerm); + reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm, + state.getLog().getLastCommittedIndex(), state.getLog().getNextIndex(), SUCCESS, callId); + } + logAppendEntries(isHeartbeat, () -> + getId() + ": succeeded to handle AppendEntries. Reply: " + ServerProtoUtils.toString(reply)); + return reply; + }); } private boolean containPrevious(TermIndex previous) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 518db7c..53df265 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -76,7 +76,8 @@ public class ServerProtoUtils { public static String toString(AppendEntriesReplyProto reply) { return toString(reply.getServerReply()) + "," + reply.getResult() - + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm(); + + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm() + + ",followerCommit" + reply.getFollowerCommit(); } private static String toString(RaftRpcReplyProto reply) { @@ -175,7 +176,7 @@ public class ServerProtoUtils { public static AppendEntriesReplyProto toAppendEntriesReplyProto( RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long term, - long nextIndex, AppendResult result, long callId) { + long followerCommit, long nextIndex, AppendResult result, long callId) { RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder( requestorId, replyId, groupId, result == AppendResult.SUCCESS) .setCallId(callId); @@ -183,6 +184,7 @@ public class ServerProtoUtils { .setServerReply(rpcReply) .setTerm(term) .setNextIndex(nextIndex) + .setFollowerCommit(followerCommit) .setResult(result).build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 2adef40..b5a38fd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -94,8 +94,9 @@ public abstract class RaftLog implements Closeable { // paper for details. final TermIndex entry = getTermIndex(majorityIndex); if (entry != null && entry.getTerm() == currentTerm) { - LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex); - lastCommitted.set(majorityIndex); + final long commitIndex = Math.min(majorityIndex, getLatestFlushedIndex()); + LOG.debug("{}: Updating lastCommitted to {}", selfId, commitIndex); + lastCommitted.set(commitIndex); return true; } }
