This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 39acebf88bc2a015c0f86f48d9334cb86a820893
Author: GewuNewOne <[email protected]>
AuthorDate: Wed Apr 23 00:23:24 2025 +0800

    RATIS-2278. Follower Fails to Append Entries Due to Index Validation Race 
Condition in NavigableIndices (#1248)
---
 .../org/apache/ratis/server/impl/RaftServerImpl.java  |  5 ++++-
 .../org/apache/ratis/server/impl/ServerImplUtils.java | 19 ++++++++++++-------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0186841ef..9b9e204d4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1634,7 +1634,10 @@ class RaftServerImpl implements RaftServer.Division,
   }
   private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
     final List<ConsecutiveIndices> entriesTermIndices = 
ConsecutiveIndices.convert(entries);
-    appendLogTermIndices.append(entriesTermIndices);
+    if (!appendLogTermIndices.append(entriesTermIndices)) {
+      // index already exists, return the last future
+      return appendLogFuture.get();
+    }
 
     return appendLogFuture.updateAndGet(f -> f.thenCompose(
             ignored -> JavaUtils.allOf(state.getLog().append(entries))))
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index c5010a534..ce4702d95 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -136,15 +136,20 @@ public final class ServerImplUtils {
       return floorEntry.getValue().getTerm(index);
     }
 
-    synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
-      for(ConsecutiveIndices indices : entriesTermIndices) {
-        // validate startIndex
-        final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
-        if (lastEntry != null) {
-          Preconditions.assertSame(lastEntry.getValue().getNextIndex(), 
indices.startIndex, "startIndex");
+    synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
+      for(int i = 0; i < entriesTermIndices.size(); i++) {
+        final ConsecutiveIndices indices = entriesTermIndices.get(i);
+        final ConsecutiveIndices previous = map.put(indices.startIndex, 
indices);
+        if (previous != null) {
+          // index already exists, revert this append
+          map.put(previous.startIndex, previous);
+          for(int j = 0; j < i; j++) {
+            map.remove(entriesTermIndices.get(j).startIndex);
+          }
+          return false;
         }
-        map.put(indices.startIndex, indices);
       }
+      return true;
     }
 
     synchronized void removeExisting(List<ConsecutiveIndices> 
entriesTermIndices) {

Reply via email to