This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 98a4c483b RATIS-2487. Trigger installSnapshot if leader cannot get 
previous entry (#1420)
98a4c483b is described below

commit 98a4c483b78980948f87236995f13885b9cc5a4a
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Apr 4 02:01:22 2026 +0800

    RATIS-2487. Trigger installSnapshot if leader cannot get previous entry 
(#1420)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 38 ---------
 .../apache/ratis/server/leader/LogAppender.java    | 83 +++++++++++++++----
 .../ratis/server/leader/LogAppenderBase.java       | 29 ++-----
 .../java/org/apache/ratis/LogAppenderTests.java    | 92 ++++++++++++++++++++++
 4 files changed, 167 insertions(+), 75 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b4d78c207..69421e9f0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -832,44 +832,6 @@ public class GrpcLogAppender extends LogAppenderBase {
     responseHandler.waitForResponse();
   }
 
-  /**
-   * Should the Leader notify the Follower to install the snapshot through
-   * its own State Machine.
-   * @return the first available log's start term index
-   */
-  private TermIndex shouldNotifyToInstallSnapshot() {
-    final FollowerInfo follower = getFollower();
-    final long leaderNextIndex = getRaftLog().getNextIndex();
-    final boolean isFollowerBootstrapping = 
getLeaderState().isFollowerBootstrapping(follower);
-    final long leaderStartIndex = getRaftLog().getStartIndex();
-    final TermIndex firstAvailable = 
Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
-        .orElseGet(() -> 
TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
-    if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
-      // If the follower is bootstrapping and has not yet installed any 
snapshot from leader, then the follower should
-      // be notified to install a snapshot. Every follower should try to 
install at least one snapshot during
-      // bootstrapping, if available.
-      LOG.debug("{}: follower is bootstrapping, notify to install snapshot to 
{}.", this, firstAvailable);
-      return firstAvailable;
-    }
-
-    final long followerNextIndex = follower.getNextIndex();
-    if (followerNextIndex >= leaderNextIndex) {
-      return null;
-    }
-
-    if (followerNextIndex < leaderStartIndex) {
-      // The Leader does not have the logs from the Follower's last log
-      // index onwards. And install snapshot is disabled. So the Follower
-      // should be notified to install the latest snapshot through its
-      // State Machine.
-      return firstAvailable;
-    } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
-      // Leader has no logs to check from, hence return next index.
-      return firstAvailable;
-    }
-
-    return null;
-  }
 
   static class AppendEntriesRequest {
     private final Timekeeper timer;
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index cff5425d3..33914fde7 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -134,38 +134,89 @@ public interface LogAppender {
   /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for 
sending the given snapshot. */
   Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String 
requestId, SnapshotInfo snapshot);
 
+  /**
+   * Get the previous {@link TermIndex} for the given next index.
+   * This is used to set the previous log entry in AppendEntries requests.
+   *
+   * @return the previous {@link TermIndex}, or null if unavailable
+   *         (e.g. the entry has been purged and the snapshot does not cover 
it).
+   */
+  default TermIndex getPrevious(long nextIndex) {
+    if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
+      return null;
+    }
+
+    final long previousIndex = nextIndex - 1;
+    final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
+    if (previous != null) {
+      return previous;
+    }
+
+    final SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
+    if (snapshot != null) {
+      final TermIndex snapshotTermIndex = snapshot.getTermIndex();
+      if (snapshotTermIndex.getIndex() == previousIndex) {
+        return snapshotTermIndex;
+      }
+    }
+
+    return null;
+  }
+
   /**
    * Should this {@link LogAppender} send a snapshot to the follower?
    *
    * @return the snapshot if it should install a snapshot; otherwise, return 
null.
    */
   default SnapshotInfo shouldInstallSnapshot() {
-    // we should install snapshot if the follower needs to catch up and:
-    // 1. there is no local log entry but there is snapshot
-    // 2. or the follower's next index is smaller than the log start index
-    // 3. or the follower is bootstrapping (i.e. not yet caught up) and has 
not installed any snapshot yet
-    final FollowerInfo follower = getFollower();
-    final boolean isFollowerBootstrapping = 
getLeaderState().isFollowerBootstrapping(follower);
     final SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
+    return shouldInstallSnapshot(snapshot != null) ? snapshot : null;
+  }
+
+  /**
+   * Should this {@link LogAppender} send a snapshot notification to the 
follower?
+   *
+   * @return the first available log {@link TermIndex} if it should install a 
snapshot; otherwise, return null.
+   */
+  default TermIndex shouldNotifyToInstallSnapshot() {
+    if (!shouldInstallSnapshot(true)) {
+      return null;
+    }
+    final TermIndex start = 
getRaftLog().getTermIndex(getRaftLog().getStartIndex());
+    if (start != null) {
+      return start;
+    }
+    // No log is currently available; return the next, which will become 
available in the future.
+    return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), 
getRaftLog().getNextIndex());
+  }
 
-    if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
-      if (snapshot == null) {
+  default boolean shouldInstallSnapshot(boolean hasSnapshot) {
+    final FollowerInfo follower = getFollower();
+    if (getLeaderState().isFollowerBootstrapping(follower)
+        && !follower.hasAttemptedToInstallSnapshot()) {
+      if (!hasSnapshot) {
         // Leader cannot send null snapshot to follower. Hence, acknowledge 
InstallSnapshot attempt (even though it
         // was not attempted) so that follower can come out of staging state 
after appending log entries.
         follower.setAttemptedToInstallSnapshot();
-      } else {
-        return snapshot;
       }
+      return true;
     }
 
+    final long leaderNextIndex = getRaftLog().getNextIndex();
     final long followerNextIndex = getFollower().getNextIndex();
-    if (followerNextIndex < getRaftLog().getNextIndex()) {
-      final long logStartIndex = getRaftLog().getStartIndex();
-      if (followerNextIndex < logStartIndex || (logStartIndex == 
RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
-        return snapshot;
-      }
+    if (followerNextIndex >= leaderNextIndex) {
+      // follower caught up already
+      return false;
     }
-    return null;
+    final long leaderStartIndex = getRaftLog().getStartIndex();
+    if (followerNextIndex < leaderStartIndex || leaderStartIndex == 
RaftLog.INVALID_LOG_INDEX) {
+      // leader does not have follower's next log
+      return true;
+    }
+    // leader does not have the previous log for appendEntries
+    return followerNextIndex == leaderStartIndex &&
+        followerNextIndex > RaftLog.LEAST_VALID_LOG_INDEX &&
+        getPrevious(followerNextIndex) == null;
   }
 
   /** Define how this {@link LogAppender} should run. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index be0404da3..4f558e0c7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -165,27 +165,6 @@ public abstract class LogAppenderBase implements 
LogAppender {
     return false;
   }
 
-  private TermIndex getPrevious(long nextIndex) {
-    if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
-      return null;
-    }
-
-    final long previousIndex = nextIndex - 1;
-    final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
-    if (previous != null) {
-      return previous;
-    }
-
-    final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-    if (snapshot != null) {
-      final TermIndex snapshotTermIndex = snapshot.getTermIndex();
-      if (snapshotTermIndex.getIndex() == previousIndex) {
-        return snapshotTermIndex;
-      }
-    }
-
-    return null;
-  }
 
   protected long getNextIndexForInconsistency(long requestFirstIndex, long 
replyNextIndex) {
     long next = replyNextIndex;
@@ -238,6 +217,14 @@ public abstract class LogAppenderBase implements 
LogAppender {
     final long snapshotIndex = follower.getSnapshotIndex();
     final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
+
+    if (previous == null && followerNext > RaftLog.LEAST_VALID_LOG_INDEX && 
followerNext != snapshotIndex + 1) {
+      LOG.info("{}: Skipping appendEntries since the previous log entry is 
unavailable:" +
+              " follower {} nextIndex={} and snapshotIndex={} but leader 
startIndex={}",
+          this, follower.getName(), followerNext, snapshotIndex, 
getRaftLog().getStartIndex());
+      return null;
+    }
+
     final long halfMs = heartbeatWaitTimeMs/2;
     for (long next = followerNext; leaderNext > next && 
getHeartbeatWaitTimeMs() - halfMs > 0; ) {
       if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java 
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index c7a7849e6..c9b19a72a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -53,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
 import org.apache.ratis.thirdparty.com.codahale.metrics.Gauge;
 import org.slf4j.event.Level;
@@ -222,4 +224,94 @@ public abstract class LogAppenderTests<CLUSTER extends 
MiniRaftCluster>
     Assertions.assertNotNull(last);
     Assertions.assertTrue(last.getIndex() <= 
leader.getInfo().getLastAppliedIndex());
   }
+
+  @Test
+  public void testNewAppendEntriesRequestAfterPurgeFollowerBehindStartIndex() 
throws Exception {
+    final RaftProperties prop = getProperties();
+    RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, 
SizeInBytes.valueOf("1KB"));
+    runWithNewCluster(3, cluster -> {
+      final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
+      // Test when followerNextIndex < leader's logStartIndex
+      runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge - 
1);
+    });
+  }
+
+  @Test
+  public void testNewAppendEntriesRequestAfterPurgeFollowerAtStartIndex() 
throws Exception {
+    final RaftProperties prop = getProperties();
+    RaftServerConfigKeys.Log.setPurgeGap(prop, 1);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, 
SizeInBytes.valueOf("1KB"));
+    runWithNewCluster(3, cluster -> {
+      final long startIndexAfterPurge = setupPurgedLeaderLog(cluster);
+      // Test when followerNextIndex == leader's logStartIndex, but the 
previous index is already purged
+      runTestNewAppendEntriesRequestAfterPurge(cluster, startIndexAfterPurge);
+    });
+  }
+
+  private long setupPurgedLeaderLog(CLUSTER cluster) throws Exception {
+    final RaftServer.Division leader = waitForLeader(cluster);
+    final RaftLog leaderLog = leader.getRaftLog();
+
+    try (RaftClient client = cluster.createClient(leader.getId())) {
+      for (SimpleMessage msg : generateMsgs(5)) {
+        client.io().send(msg);
+      }
+    }
+
+    final long lastLogIndex = leaderLog.getLastEntryTermIndex().getIndex();
+    LOG.info("Leader log lastIndex={}, startIndex={}", lastLogIndex, 
leaderLog.getStartIndex());
+    Assertions.assertTrue(lastLogIndex > 5, "Need enough log entries for the 
test");
+
+    // Take a snapshot so that shouldInstallSnapshot() can return it
+    final long snapshotIndex = 
SimpleStateMachine4Testing.get(leader).takeSnapshot();
+    LOG.info("Snapshot taken at index {}", snapshotIndex);
+    Assertions.assertTrue(snapshotIndex > 0, "Snapshot should have been 
taken");
+
+    final long purgeUpTo = lastLogIndex - 2;
+    LOG.info("Purging leader log up to index {}", purgeUpTo);
+    leaderLog.purge(purgeUpTo).get();
+
+    final long startIndexAfterPurge = leaderLog.getStartIndex();
+    LOG.info("Leader log after purge: startIndex={}", startIndexAfterPurge);
+    Assertions.assertTrue(startIndexAfterPurge > 1,
+        "Purge should have advanced startIndex, but got " + 
startIndexAfterPurge);
+
+    return startIndexAfterPurge;
+  }
+
+  void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster,
+      long targetNextIndex) throws Exception {
+    final RaftServer.Division leader = waitForLeader(cluster);
+    final RaftLog leaderLog = leader.getRaftLog();
+    final long startIndexAfterPurge = leaderLog.getStartIndex();
+
+    final Stream<LogAppender> appenders = 
RaftServerTestUtil.getLogAppenders(leader);
+    Assertions.assertNotNull(appenders, "Leader should have log appenders");
+    final LogAppender appender = appenders.findFirst().orElseThrow(
+        () -> new AssertionError("No log appender found"));
+
+    Assertions.assertTrue(targetNextIndex > RaftLog.LEAST_VALID_LOG_INDEX,
+        "targetNextIndex should be > LEAST_VALID_LOG_INDEX");
+    appender.getFollower().setNextIndex(targetNextIndex);
+
+    LOG.info("Set follower nextIndex={}, startIndexAfterPurge={}, 
snapshotIndex={}",
+        targetNextIndex, startIndexAfterPurge, 
appender.getFollower().getSnapshotIndex());
+    Assertions.assertEquals(0, appender.getFollower().getSnapshotIndex(),
+        "Follower snapshotIndex should be 0 (default, never installed 
snapshot)");
+
+    Assertions.assertNull(leaderLog.getTermIndex(targetNextIndex - 1),
+        "Entry at previousIndex=" + (targetNextIndex - 1) + " should have been 
purged");
+
+    // Should return null instead of throwing NPE
+    Assertions.assertNull(appender.newAppendEntriesRequest(0, false),
+        "newAppendEntriesRequest should return null when previous TermIndex is 
not found");
+
+    Assertions.assertEquals(targetNextIndex, 
appender.getFollower().getNextIndex(),
+        "Follower nextIndex should remain unchanged");
+
+    Assertions.assertNotNull(appender.shouldInstallSnapshot(),
+        "shouldInstallSnapshot should return non-null when followerNextIndex ("
+            + targetNextIndex + ") and previous entry has been purged");
+  }
 }

Reply via email to