This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 21d7526 RATIS-515. IllegalStateException while updating matchIndex.
Contributed by Tsz Wo Nicholas Sze.
21d7526 is described below
commit 21d7526217795b6477e814365c216c9597c226ee
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue May 14 19:31:11 2019 +0530
RATIS-515. IllegalStateException while updating matchIndex. Contributed by
Tsz Wo Nicholas Sze.
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 103 ++++++++-------------
.../org/apache/ratis/server/impl/FollowerInfo.java | 4 +-
.../org/apache/ratis/server/impl/LogAppender.java | 23 ++---
.../apache/ratis/server/impl/RaftServerImpl.java | 99 +++++++-------------
.../apache/ratis/server/impl/ServerProtoUtils.java | 18 ++--
.../org/apache/ratis/server/impl/ServerState.java | 41 ++++----
.../org/apache/ratis/server/storage/RaftLog.java | 8 +-
.../statemachine/SimpleStateMachine4Testing.java | 3 +-
.../ratis/grpc/TestInstallSnapshotWithGrpc.java | 2 +-
9 files changed, 128 insertions(+), 173 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b8926f1..0403ae9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -84,8 +84,9 @@ public class GrpcLogAppender extends LogAppender {
// clear the pending requests queue and reset the next index of follower
final long nextIndex = request != null && request.hasPreviousLog()?
- request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
- clearPendingRequests(nextIndex);
+ request.getPreviousLog().getIndex() + 1: follower.getMatchIndex() + 1;
+ pendingRequests.clear();
+ follower.decreaseNextIndex(nextIndex);
}
@Override
@@ -222,37 +223,51 @@ public class GrpcLogAppender extends LogAppender {
*/
@Override
public void onNext(AppendEntriesReplyProto reply) {
+ final AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: received {} reply {} ", getFollower().getName(),
- firstResponseReceived? "a": "the first",
ServerProtoUtils.toString(reply));
+ LOG.debug("{}: received {} reply {}, request={}",
+ follower.getName(), firstResponseReceived? "a": "the first",
+ ServerProtoUtils.toString(reply),
ServerProtoUtils.toString(request));
}
try {
- onNextImpl(reply);
+ onNextImpl(request, reply);
} catch(Throwable t) {
- LOG.error("Failed onNext " + reply, t);
+ LOG.error("Failed onNext request=" + ServerProtoUtils.toString(request)
+ + ", reply=" + ServerProtoUtils.toString(reply), t);
}
}
- private void onNextImpl(AppendEntriesReplyProto reply) {
+ private void onNextImpl(AppendEntriesRequestProto request,
AppendEntriesReplyProto reply) {
// update the last rpc time
follower.updateLastRpcResponseTime();
if (!firstResponseReceived) {
firstResponseReceived = true;
}
+ if (request == null) {
+ // The request is already handled (probably timeout), ignore the reply.
+ LOG.warn("{}: Request not found, ignoring reply: {}", this,
ServerProtoUtils.toString(reply));
+ return;
+ }
+
switch (reply.getResult()) {
case SUCCESS:
- onSuccess(reply);
+ updateCommitIndex(reply.getFollowerCommit());
+ if (checkAndUpdateMatchIndex(request)) {
+ submitEventOnSuccessAppend();
+ }
break;
case NOT_LEADER:
- onNotLeader(reply);
+ if (checkResponseTerm(reply.getTerm())) {
+ return;
+ }
break;
case INCONSISTENCY:
- onInconsistency(reply);
+ checkAndUpdateNextIndex(request, reply.getNextIndex());
break;
default:
- break;
+ throw new IllegalStateException("Unexpected reply result: " +
reply.getResult());
}
notifyAppend();
}
@@ -269,7 +284,7 @@ public class GrpcLogAppender extends LogAppender {
GrpcUtil.warn(LOG, () -> getFollower().getName() + ": Failed
appendEntries", t);
long callId = GrpcUtil.getCallId(t);
- resetClient(pendingRequests.get(callId));
+ resetClient(pendingRequests.remove(callId));
}
@Override
@@ -279,61 +294,19 @@ public class GrpcLogAppender extends LogAppender {
}
}
- private void clearPendingRequests(long newNextIndex) {
- pendingRequests.clear();
- follower.decreaseNextIndex(newNextIndex);
+ private boolean checkAndUpdateMatchIndex(AppendEntriesRequestProto request) {
+ final int n = request.getEntriesCount();
+ final long newMatchIndex = n == 0? request.getPreviousLog().getIndex():
request.getEntries(n - 1).getIndex();
+ return follower.updateMatchIndex(newMatchIndex);
}
- private synchronized void onSuccess(AppendEntriesReplyProto reply) {
- AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
- if (request == null) {
- // If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Request not found, ignoring SUCCESS reply: {}", this,
ServerProtoUtils.toString(reply));
- return;
- }
- updateCommitIndex(reply.getFollowerCommit());
-
- final long replyNextIndex = reply.getNextIndex();
- final long lastIndex = replyNextIndex - 1;
- final boolean updateMatchIndex;
-
- if (request.getEntriesCount() == 0) {
- Preconditions.assertTrue(!request.hasPreviousLog() ||
- lastIndex == request.getPreviousLog().getIndex(),
- "reply's next index is %s, request's previous is %s",
- replyNextIndex, request.getPreviousLog());
- updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex()
< lastIndex;
- } else {
- // check if the reply and the pending request is consistent
- final long lastEntryIndex = request
- .getEntries(request.getEntriesCount() - 1).getIndex();
- Preconditions.assertTrue(lastIndex == lastEntryIndex,
- "reply's next index is %s, request's last entry index is %s",
- replyNextIndex, lastEntryIndex);
- updateMatchIndex = true;
- }
- if (updateMatchIndex) {
- follower.updateMatchIndex(lastIndex);
- submitEventOnSuccessAppend();
- }
- }
-
- private void onNotLeader(AppendEntriesReplyProto reply) {
- checkResponseTerm(reply.getTerm());
- // the running loop will end and the connection will onComplete
- }
-
- private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
- AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
- if (request == null) {
- // If reply comes after timeout, the reply is ignored.
- LOG.warn("{}: Request not found, ignoring INCONSISTENCY reply: {}",
this, ServerProtoUtils.toString(reply));
- return;
- }
+ private void checkAndUpdateNextIndex(AppendEntriesRequestProto request, long
replyNextIndex) {
Preconditions.assertTrue(request.hasPreviousLog());
- if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
- pendingRequests.clear();
- follower.updateNextIndex(reply.getNextIndex());
+ if (request.getPreviousLog().getIndex() >= replyNextIndex) {
+ synchronized (this) {
+ pendingRequests.clear();
+ follower.updateNextIndex(replyNextIndex);
+ }
}
}
@@ -514,7 +487,7 @@ public class GrpcLogAppender extends LogAppender {
* its own State Machine.
* @return the first available log's start term index
*/
- protected TermIndex shouldNotifyToInstallSnapshot() {
+ private TermIndex shouldNotifyToInstallSnapshot() {
if (follower.getNextIndex() < raftLog.getStartIndex()) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 6d52b7b..ec02553 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -62,8 +62,8 @@ public class FollowerInfo {
return matchIndex.get();
}
- public void updateMatchIndex(long newMatchIndex) {
- matchIndex.updateIncreasingly(newMatchIndex, debugIndexChange);
+ public boolean updateMatchIndex(long newMatchIndex) {
+ return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
}
/** @return the commit index acked by the follower. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index fed52bc..0052a2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -170,17 +170,16 @@ public class LogAppender {
}
private TermIndex getPrevious() {
- TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1);
- if (previous == null) {
- // if previous is null, nextIndex must be equal to the log start
- // index (otherwise we will install snapshot).
- Preconditions.assertTrue(follower.getNextIndex() ==
raftLog.getStartIndex(),
- "%s: follower's next index %s, local log start index %s",
- this, follower.getNextIndex(), raftLog.getStartIndex());
- SnapshotInfo snapshot = server.getState().getLatestSnapshot();
- previous = snapshot == null ? null : snapshot.getTermIndex();
+ final long nextIndex = follower.getNextIndex();
+ final TermIndex previous = raftLog.getTermIndex(nextIndex - 1);
+ if (previous != null) {
+ return previous;
}
- return previous;
+ final long logStartIndex = raftLog.getStartIndex();
+ Preconditions.assertTrue(nextIndex == logStartIndex,
+ "%s: follower's nextIndex = %s != logStartIndex = %s", this,
nextIndex, logStartIndex);
+ final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+ return snapshot == null ? null : snapshot.getTermIndex();
}
protected AppendEntriesRequestProto createRequest(long callId) throws
RaftLogIOException {
@@ -525,12 +524,14 @@ public class LogAppender {
return halfMinTimeoutMs - follower.getLastRpcTime().elapsedTimeMs();
}
- protected void checkResponseTerm(long responseTerm) {
+ protected boolean checkResponseTerm(long responseTerm) {
synchronized (server) {
if (isAppenderRunning() && follower.isAttendingVote()
&& responseTerm > leaderState.getCurrentTerm()) {
leaderState.submitStepDownEvent(responseTerm);
+ return true;
}
}
+ return false;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f242aac..584332d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -888,14 +888,13 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
- final long nextIndex = state.getNextIndex();
final Optional<FollowerState> followerState;
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), groupId, currentTerm, followerCommit,
nextIndex, NOT_LEADER, callId);
+ leaderId, getId(), groupId, currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {}
reply: {}",
getId(), leaderId, leaderTerm, state,
ServerProtoUtils.toString(reply));
@@ -919,12 +918,11 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
// 1. There is a snapshot installation in progress
// 2. There is an overlap between the snapshot index and the entries
// 3. There is a gap between the local log and the entries
- // In any of these scenarios, we should retrun an INCONSISTENCY reply
- // back to leader so that the leader can update this follower's next
- // index.
+ // In any of these scenarios, we should return an INCONSISTENCY reply
+ // back to leader so that the leader can update this follower's next
index.
AppendEntriesReplyProto inconsistencyReply =
checkInconsistentAppendEntries(
- leaderId, currentTerm, followerCommit, previous, nextIndex, callId,
entries);
+ leaderId, currentTerm, followerCommit, previous, callId, entries);
if (inconsistencyReply != null) {
followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(inconsistencyReply);
@@ -956,73 +954,46 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId
leaderId, long currentTerm,
- long followerCommit, TermIndex previous, long nextIndex, long callId,
LogEntryProto... entries) {
- long replyNextIndex = -1;
-
- // Check if a snapshot installation through state machine is in progress.
- if (inProgressInstallSnapshotRequest.get() != null) {
- replyNextIndex = Math.min(nextIndex, previous.getIndex());
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Cannot append entries as snapshot installation is in " +
- "progress. Follower next index: {}", getId(), replyNextIndex);
- }
+ long followerCommit, TermIndex previous, long callId, LogEntryProto...
entries) {
+ final long replyNextIndex = checkInconsistentAppendEntries(previous,
entries);
+ if (replyNextIndex == -1) {
+ return null;
}
- // If a snapshot installation has happened, the new snapshot might
- // include the log entry indices sent as part of the
- // AppendEntriesRequestProto. Check that the first log entry proto is
- // greater than the last index included in the latest snapshot. If not,
- // the leader should be informed about the new snapshot index so that
- // it can send log entries only from the next log index
- long snapshotIndex = state.getSnapshotIndex();
- if (snapshotIndex > 0 && entries != null && entries.length > 0
- && entries[0].getIndex() <= snapshotIndex) {
- replyNextIndex = snapshotIndex + 1;
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Cannot append entries as latest snapshot already has " +
- "the append entries. Snapshot index: {}, first append entry " +
- "index: {}.", getId(), snapshotIndex, entries[0].getIndex());
- }
- }
+ final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
+ leaderId, getId(), groupId, currentTerm, followerCommit,
replyNextIndex, INCONSISTENCY, callId);
+ LOG.info("{}: inconsistency entries. Reply:{}", getId(),
ServerProtoUtils.toString(reply));
+ return reply;
+ }
- // We need to check if "previous" is in the local peer. Note that it is
- // possible that "previous" is covered by the latest snapshot: e.g.,
- // it's possible there's no log entries outside of the latest snapshot.
- // However, it is not possible that "previous" index is smaller than the
- // last index included in snapshot. This is because indices <= snapshot's
- // last index should have been committed.
- if (previous != null && !containPrevious(previous)) {
- replyNextIndex = Math.min(nextIndex, previous.getIndex());
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Cannot append entries as there is a gap between " +
- "local log and append entries. Previous is not present. " +
- "Previous: {}, follower next index: {}", getId(), previous,
replyNextIndex);
- }
+ private long checkInconsistentAppendEntries(TermIndex previous,
LogEntryProto... entries) {
+ // Check if a snapshot installation through state machine is in progress.
+ final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
+ if (installSnapshot != null) {
+ LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in
progress", getId(), installSnapshot);
+ return installSnapshot.getIndex();
}
- if (replyNextIndex != -1) {
- final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), groupId, currentTerm, followerCommit,
replyNextIndex,
- INCONSISTENCY, callId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: inconsistency entries. Reply:{}", getId(),
ServerProtoUtils.toString(reply));
+ // Check that the first log entry is greater than the snapshot index in
the latest snapshot.
+ // If not, reply to the leader the new next index.
+ if (entries != null && entries.length > 0) {
+ final long firstEntryIndex = entries[0].getIndex();
+ final long snapshotIndex = state.getSnapshotIndex();
+ if (snapshotIndex > 0 && snapshotIndex >= firstEntryIndex) {
+ LOG.info("{}: Failed appendEntries as latest snapshot ({}) already has
the append entries (first index: {})",
+ getId(), snapshotIndex, firstEntryIndex);
+ return snapshotIndex + 1;
}
- return reply;
}
- return null;
- }
-
- private boolean containPrevious(TermIndex previous) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
- getId(), previous, state.getLatestSnapshot(),
state.getLatestInstalledSnapshot());
+ // Check if "previous" is contained in current state.
+ if (previous != null && !state.containsTermIndex(previous)) {
+ final long replyNextIndex = Math.min(state.getNextIndex(),
previous.getIndex());
+ LOG.info("{}: Failed appendEntries as previous log entry ({}) is not
found", getId(), previous);
+ return replyNextIndex;
}
- return state.getLog().contains(previous)
- || (state.getLatestSnapshot() != null
- && state.getLatestSnapshot().getTermIndex().equals(previous))
- || (state.getLatestInstalledSnapshot() != null)
- && state.getLatestInstalledSnapshot().equals(previous);
+
+ return -1;
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index ed5c728..ff1e368 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -94,6 +94,9 @@ public interface ServerProtoUtils {
: "size=" + entries.size() + ", first=" +
toLogEntryString(entries.get(0));
}
static String toString(AppendEntriesRequestProto proto) {
+ if (proto == null) {
+ return null;
+ }
return ProtoUtils.toString(proto.getServerRequest()) + "-t" +
proto.getLeaderTerm()
+ ", previous=" + toTermIndexString(proto.getPreviousLog())
+ ", leaderCommit=" + proto.getLeaderCommit()
@@ -101,18 +104,19 @@ public interface ServerProtoUtils {
+ ", entries: " + toShortString(proto.getEntriesList());
}
static String toString(AppendEntriesReplyProto reply) {
- return toString(reply.getServerReply()) + "," + reply.getResult()
+ if (reply == null) {
+ return null;
+ }
+ return ProtoUtils.toString(reply.getServerReply()) + "," +
reply.getResult()
+ ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
+ ",followerCommit:" + reply.getFollowerCommit();
}
static String toString(RequestVoteReplyProto proto) {
- return toString(proto.getServerReply()) + "-t" + proto.getTerm();
- }
-
- static String toString(RaftRpcReplyProto reply) {
- return reply.getRequestorId().toStringUtf8() + "->"
- + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess();
+ if (proto == null) {
+ return null;
+ }
+ return ProtoUtils.toString(proto.getServerReply()) + "-t" +
proto.getTerm();
}
static RaftConfigurationProto.Builder
toRaftConfigurationProto(RaftConfiguration conf) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 8f2cbed..aa21956 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -42,6 +42,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -87,7 +88,7 @@ public class ServerState implements Closeable {
* snapshot. Once we successfully install a snapshot, the SM may not pick it
up immediately.
* Further, this will not get updated when SM does snapshots itself.
*/
- private volatile TermIndex latestInstalledSnapshot;
+ private final AtomicReference<TermIndex> latestInstalledSnapshot = new
AtomicReference<>();
ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
RaftServerImpl server, StateMachine stateMachine)
@@ -407,35 +408,31 @@ public class ServerState implements Closeable {
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
snapshotManager.installSnapshot(sm, request);
- log.syncWithSnapshot(request.getSnapshotChunk().getTermIndex().getIndex());
- this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
- request.getSnapshotChunk().getTermIndex());
+
updateInstalledSnapshotIndex(ServerProtoUtils.toTermIndex(request.getSnapshotChunk().getTermIndex()));
}
void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
- this.latestInstalledSnapshot = lastTermIndexInSnapshot;
+ latestInstalledSnapshot.set(lastTermIndexInSnapshot);
}
SnapshotInfo getLatestSnapshot() {
- return
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
+ return server.getStateMachine().getLatestSnapshot();
}
- public TermIndex getLatestInstalledSnapshot() {
- return latestInstalledSnapshot;
+ public long getLatestInstalledSnapshotIndex() {
+ final TermIndex ti = latestInstalledSnapshot.get();
+ return ti != null? ti.getIndex(): 0L;
}
/**
- * The last index included in either the latestSnapshot or the
- * latestInsalledSnapshot
+ * The last index included in either the latestSnapshot or the
latestInstalledSnapshot
* @return the last snapshot index
*/
- public long getSnapshotIndex() {
- final long latestSnapshotIndex = getLatestSnapshot() != null ?
- getLatestSnapshot().getIndex() : 0;
- final long latestInstalledSnapshotIndex = latestInstalledSnapshot != null ?
- latestInstalledSnapshot.getIndex() : 0;
- return Math.max(latestSnapshotIndex, latestInstalledSnapshotIndex);
+ long getSnapshotIndex() {
+ final SnapshotInfo s = getLatestSnapshot();
+ final long latestSnapshotIndex = s != null ? s.getIndex() : 0;
+ return Math.max(latestSnapshotIndex, getLatestInstalledSnapshotIndex());
}
public long getNextIndex() {
@@ -447,4 +444,16 @@ public class ServerState implements Closeable {
public long getLastAppliedIndex() {
return stateMachineUpdater.getLastAppliedIndex();
}
+
+ boolean containsTermIndex(TermIndex ti) {
+ Objects.requireNonNull(ti, "ti == null");
+
+ if
(Optional.ofNullable(latestInstalledSnapshot.get()).filter(ti::equals).isPresent())
{
+ return true;
+ }
+ if
(Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent())
{
+ return true;
+ }
+ return log.contains(ti);
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index f1379e4..84e20ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
@@ -123,11 +124,8 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
* by the leader.
*/
public boolean contains(TermIndex ti) {
- if (ti == null) {
- return false;
- }
- TermIndex local = getTermIndex(ti.getIndex());
- return ti.equals(local);
+ Objects.requireNonNull(ti, "ti == null");
+ return ti.equals(getTermIndex(ti.getIndex()));
}
/**
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index ac3813f..5e5c9c6 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -292,8 +292,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
snapshot == null ? null : snapshot.getFile());
return RaftServerConstants.INVALID_LOG_INDEX;
} else {
- LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(),
- snapshot.getIndex(), snapshot.getFile().getPath());
+ LOG.info("Loading snapshot {}", snapshot);
final long endIndex = snapshot.getIndex();
try (LogInputStream in = new LogInputStream(
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
index e512262..7f75fff 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -175,7 +175,7 @@ public class TestInstallSnapshotWithGrpc {
for (RaftServerImpl follower : cluster.getFollowers()) {
follower.getState().getStorage().getStorageDir().getStateMachineDir();
Assert.assertEquals(leaderSnapshotInfo.getIndex(),
- follower.getState().getLatestInstalledSnapshot().getIndex());
+ follower.getState().getLatestInstalledSnapshotIndex());
}
// restart the peer and check if it can correctly handle conf change