This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 98a4c483b RATIS-2487. Trigger installSnapshot if leader cannot get
previous entry (#1420)
98a4c483b is described below
commit 98a4c483b78980948f87236995f13885b9cc5a4a
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Apr 4 02:01:22 2026 +0800
RATIS-2487. Trigger installSnapshot if leader cannot get previous entry
(#1420)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 38 ---------
.../apache/ratis/server/leader/LogAppender.java | 83 +++++++++++++++----
.../ratis/server/leader/LogAppenderBase.java | 29 ++-----
.../java/org/apache/ratis/LogAppenderTests.java | 92 ++++++++++++++++++++++
4 files changed, 167 insertions(+), 75 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b4d78c207..69421e9f0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -832,44 +832,6 @@ public class GrpcLogAppender extends LogAppenderBase {
responseHandler.waitForResponse();
}
- /**
- * Should the Leader notify the Follower to install the snapshot through
- * its own State Machine.
- * @return the first available log's start term index
- */
- private TermIndex shouldNotifyToInstallSnapshot() {
- final FollowerInfo follower = getFollower();
- final long leaderNextIndex = getRaftLog().getNextIndex();
- final boolean isFollowerBootstrapping =
getLeaderState().isFollowerBootstrapping(follower);
- final long leaderStartIndex = getRaftLog().getStartIndex();
- final TermIndex firstAvailable =
Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
- .orElseGet(() ->
TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
- if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
- // If the follower is bootstrapping and has not yet installed any
snapshot from leader, then the follower should
- // be notified to install a snapshot. Every follower should try to
install at least one snapshot during
- // bootstrapping, if available.
- LOG.debug("{}: follower is bootstrapping, notify to install snapshot to
{}.", this, firstAvailable);
- return firstAvailable;
- }
-
- final long followerNextIndex = follower.getNextIndex();
- if (followerNextIndex >= leaderNextIndex) {
- return null;
- }
-
- if (followerNextIndex < leaderStartIndex) {
- // The Leader does not have the logs from the Follower's last log
- // index onwards. And install snapshot is disabled. So the Follower
- // should be notified to install the latest snapshot through its
- // State Machine.
- return firstAvailable;
- } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
- // Leader has no logs to check from, hence return next index.
- return firstAvailable;
- }
-
- return null;
- }
static class AppendEntriesRequest {
private final Timekeeper timer;
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index cff5425d3..33914fde7 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -134,38 +134,89 @@ public interface LogAppender {
/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for
sending the given snapshot. */
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String
requestId, SnapshotInfo snapshot);
+ /**
+ * Get the previous {@link TermIndex} for the given next index.
+ * This is used to set the previous log entry in AppendEntries requests.
+ *
+ * @return the previous {@link TermIndex}, or null if unavailable
+ * (e.g. the entry has been purged and the snapshot does not cover
it).
+ */
+ default TermIndex getPrevious(long nextIndex) {
+ if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
+ return null;
+ }
+
+ final long previousIndex = nextIndex - 1;
+ final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
+ if (previous != null) {
+ return previous;
+ }
+
+ final SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
+ if (snapshot != null) {
+ final TermIndex snapshotTermIndex = snapshot.getTermIndex();
+ if (snapshotTermIndex.getIndex() == previousIndex) {
+ return snapshotTermIndex;
+ }
+ }
+
+ return null;
+ }
+
/**
* Should this {@link LogAppender} send a snapshot to the follower?
*
* @return the snapshot if it should install a snapshot; otherwise, return
null.
*/
default SnapshotInfo shouldInstallSnapshot() {
- // we should install snapshot if the follower needs to catch up and:
- // 1. there is no local log entry but there is snapshot
- // 2. or the follower's next index is smaller than the log start index
- // 3. or the follower is bootstrapping (i.e. not yet caught up) and has
not installed any snapshot yet
- final FollowerInfo follower = getFollower();
- final boolean isFollowerBootstrapping =
getLeaderState().isFollowerBootstrapping(follower);
final SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
+ return shouldInstallSnapshot(snapshot != null) ? snapshot : null;
+ }
+
+ /**
+ * Should this {@link LogAppender} send a snapshot notification to the
follower?
+ *
+ * @return the first available log {@link TermIndex} if it should install a
snapshot; otherwise, return null.
+ */
+ default TermIndex shouldNotifyToInstallSnapshot() {
+ if (!shouldInstallSnapshot(true)) {
+ return null;
+ }
+ final TermIndex start =
getRaftLog().getTermIndex(getRaftLog().getStartIndex());
+ if (start != null) {
+ return start;
+ }
+ // No log is currently available; return the next, which will become
available in the future.
+ return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
getRaftLog().getNextIndex());
+ }
- if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
- if (snapshot == null) {
+ default boolean shouldInstallSnapshot(boolean hasSnapshot) {
+ final FollowerInfo follower = getFollower();
+ if (getLeaderState().isFollowerBootstrapping(follower)
+ && !follower.hasAttemptedToInstallSnapshot()) {
+ if (!hasSnapshot) {
// Leader cannot send null snapshot to follower. Hence, acknowledge
InstallSnapshot attempt (even though it
// was not attempted) so that follower can come out of staging state
after appending log entries.
follower.setAttemptedToInstallSnapshot();
- } else {
- return snapshot;
}
+ return true;
}
+ final long leaderNextIndex = getRaftLog().getNextIndex();
final long followerNextIndex = getFollower().getNextIndex();
- if (followerNextIndex < getRaftLog().getNextIndex()) {
- final long logStartIndex = getRaftLog().getStartIndex();
- if (followerNextIndex < logStartIndex || (logStartIndex ==
RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
- return snapshot;
- }
+ if (followerNextIndex >= leaderNextIndex) {
+ // follower caught up already
+ return false;
}
- return null;
+ final long leaderStartIndex = getRaftLog().getStartIndex();
+ if (followerNextIndex < leaderStartIndex || leaderStartIndex ==
RaftLog.INVALID_LOG_INDEX) {
+ // leader does not have follower's next log
+ return true;
+ }
+ // leader does not have the previous log for appendEntries
+ return followerNextIndex == leaderStartIndex &&
+ followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX &&
+ getPrevious(followerNextIndex) == null;
}
/** Define how this {@link LogAppender} should run. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index be0404da3..4f558e0c7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -165,27 +165,6 @@ public abstract class LogAppenderBase implements
LogAppender {
return false;
}
- private TermIndex getPrevious(long nextIndex) {
- if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
- return null;
- }
-
- final long previousIndex = nextIndex - 1;
- final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
- if (previous != null) {
- return previous;
- }
-
- final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
- if (snapshot != null) {
- final TermIndex snapshotTermIndex = snapshot.getTermIndex();
- if (snapshotTermIndex.getIndex() == previousIndex) {
- return snapshotTermIndex;
- }
- }
-
- return null;
- }
protected long getNextIndexForInconsistency(long requestFirstIndex, long
replyNextIndex) {
long next = replyNextIndex;
@@ -238,6 +217,14 @@ public abstract class LogAppenderBase implements
LogAppender {
final long snapshotIndex = follower.getSnapshotIndex();
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
+
+ if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX &&
followerNext != snapshotIndex + 1) {
+ LOG.info("{}: Skipping appendEntries since the previous log entry is
unavailable:" +
+ " follower {} nextIndex={} and snapshotIndex={} but leader
startIndex={}",
+ this, follower.getName(), followerNext, snapshotIndex,
getRaftLog().getStartIndex());
+ return null;
+ }
+
final long halfMs = heartbeatWaitTimeMs/2;
for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; ) {
if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index c7a7849e6..c9b19a72a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -53,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge;
import org.slf4j.event.Level;
@@ -222,4 +224,94 @@ public abstract class LogAppenderTests<CLUSTER extends
MiniRaftCluster>
Assertions.assertNotNull(last);
Assertions.assertTrue(last.getIndex() <=
leader.getInfo().getLastAppliedIndex());
}
+
+ @Test
+ public void testNewAppendEntriesRequestAfterPurgeFollowerBehindStartIndex()
throws Exception {
+ final RaftProperties prop = getProperties();
+ RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(prop,
SizeInBytes.valueOf("1KB"));
+ runWithNewCluster(3, cluster -> {
+ final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
+ // Test when followerNextIndex < leader's logStartIndex
+ runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge -
1);
+ });
+ }
+
+ @Test
+ public void testNewAppendEntriesRequestAfterPurgeFollowerAtStartIndex()
throws Exception {
+ final RaftProperties prop = getProperties();
+ RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(prop,
SizeInBytes.valueOf("1KB"));
+ runWithNewCluster(3, cluster -> {
+ final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
+ // Test when followerNextIndex == leader's logStartIndex, but the
previous index is already purged
+ runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge);
+ });
+ }
+
+ private long setupPurgedLeaderLog(CLUSTER cluster) throws Exception {
+ final RaftServer.Division leader = waitForLeader(cluster);
+ final RaftLog leaderLog = leader.getRaftLog();
+
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ for (SimpleMessage msg : generateMsgs(5)) {
+ client.io().send(msg);
+ }
+ }
+
+ final long lastLogIndex = leaderLog.getLastEntryTermIndex().getIndex();
+ LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex,
leaderLog.getStartIndex());
+ Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the
test");
+
+ // Take a snapshot so that shouldInstallSnapshot() can return it
+ final long snapshotIndex =
SimpleStateMachine4Testing.get(leader).takeSnapshot();
+ LOG.info("Snapshot taken at index {}", snapshotIndex);
+ Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been
taken");
+
+ final long purgeUpTo = lastLogIndex - 2;
+ LOG.info("Purging leader log up to index {}", purgeUpTo);
+ leaderLog.purge(purgeUpTo).get();
+
+ final long startIndexAfterPurge = leaderLog.getStartIndex();
+ LOG.info("Leader log after purge: startIndex={}", startIndexAfterPurge);
+ Assertions.assertTrue(startIndexAfterPurge > 1,
+ "Purge should have advanced startIndex, but got " +
startIndexAfterPurge);
+
+ return startIndexAfterPurge;
+ }
+
+ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster,
+ long targetNextIndex) throws Exception {
+ final RaftServer.Division leader = waitForLeader(cluster);
+ final RaftLog leaderLog = leader.getRaftLog();
+ final long startIndexAfterPurge = leaderLog.getStartIndex();
+
+ final Stream<LogAppender> appenders =
RaftServerTestUtil.getLogAppenders(leader);
+ Assertions.assertNotNull(appenders, "Leader should have log appenders");
+ final LogAppender appender = appenders.findFirst().orElseThrow(
+ () -> new AssertionError("No log appender found"));
+
+ Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX,
+ "targetNextIndex should be > LEAST_VALID_LOG_INDEX");
+ appender.getFollower().setNextIndex(targetNextIndex);
+
+ LOG.info("Set follower nextIndex={}, startIndexAfterPurge={},
snapshotIndex={}",
+ targetNextIndex, startIndexAfterPurge,
appender.getFollower().getSnapshotIndex());
+ Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(),
+ "Follower snapshotIndex should be 0 (default, never installed
snapshot)");
+
+ Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1),
+ "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been
purged");
+
+ // Should return null instead of throwing NPE
+ Assertions.assertNull(appender.newAppendEntriesRequest(0, false),
+ "newAppendEntriesRequest should return null when previous TermIndex is
not found");
+
+ Assertions.assertEquals(targetNextIndex,
appender.getFollower().getNextIndex(),
+ "Follower nextIndex should remain unchanged");
+
+ Assertions.assertNotNull(appender.shouldInstallSnapshot(),
+ "shouldInstallSnapshot should return non-null when followerNextIndex ("
+ + targetNextIndex + ") and previous entry has been purged");
+ }
}