This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b06970917 RATIS-2434. Regression test for data race between 
SegmentedRaftLog.get() and LogSegment.append(). (#1378)
b06970917 is described below

commit b069709177bfd0d681cf7d47097c13668ba12b97
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Fri Mar 13 19:39:29 2026 -0700

    RATIS-2434. Regression test for data race between SegmentedRaftLog.get() 
and LogSegment.append(). (#1378)
---
 .../ratis/server/raftlog/segmented/LogSegment.java |  7 +++
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 73 ++++++++++++++++++++++
 2 files changed, 80 insertions(+)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c40b91f70..e9cb2e50f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import org.apache.ratis.util.CodeInjectionForTesting;
 
 
 /**
@@ -357,6 +358,7 @@ public final class LogSegment {
     append(true, entry, op);
   }
 
+  public static final String APPEND_RECORD = LogSegment.class.getSimpleName() 
+ ".append";
   private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
     Objects.requireNonNull(entry, "entry == null");
     final LogRecord currentLast = records.getLast();
@@ -371,9 +373,14 @@ public final class LogSegment {
 
     final LogRecord record = new LogRecord(totalFileSize, entry);
     if (keepEntryInCache) {
+      // It is important to put the entry into the cache before appending the
+      // record to the record list. Otherwise, a reader thread may get the
+      // record from the list but not the entry from the cache.
       putEntryCache(record.getTermIndex(), entry, op);
+      CodeInjectionForTesting.execute(APPEND_RECORD, this, 
record.getTermIndex());
     }
     records.append(record);
+
     totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index a772b0002..43aafc896 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -75,6 +76,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import static java.lang.Boolean.FALSE;
@@ -873,4 +876,74 @@ public class TestSegmentedRaftLog extends BaseTest {
           " ns with asyncFlush " + useAsyncFlush);
     }
   }
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(TestSegmentedRaftLog.class);
+  @Test
+  public void testConcurrentGetDuringAppend() throws Exception {
+    RaftServerConfigKeys.Log.setReadLockEnabled(properties, false);
+    final CountDownLatch injectionPaused = new CountDownLatch(1);
+    final CountDownLatch readerCanProceed = new CountDownLatch(1);
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    final AtomicReference<LogEntryProto> readEntry = new AtomicReference<>();
+
+    final CodeInjectionForTesting.Code code = (localId, remoteId, args) -> {
+      // in log worker thread, holding write lock
+      injectionPaused.countDown();
+      try {
+        if (!readerCanProceed.await(5, TimeUnit.SECONDS)) {
+          error.set(new TimeoutException("The reader thread did not start in 
time."));
+        }
+      } catch (InterruptedException e) {
+        error.set(e);
+      }
+      return true;
+    };
+
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+      CodeInjectionForTesting.put(LogSegment.APPEND_RECORD, code);
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      final LogEntryProto newEntry = prepareLogEntry(1, 0, () -> "newEntry", 
false);
+
+      // Run appendEntry asynchronously.
+      final Thread appender = new Thread(() -> {
+        raftLog.appendEntry(newEntry).join();
+      });
+      appender.start();
+
+      // Wait until the append operation is paused at the injection point.
+      Assertions.assertTrue(injectionPaused.await(5, TimeUnit.SECONDS), 
"Injection point was not hit.");
+
+      // Start a new reader thread to call get().
+      // This thread will block until the write lock is released.
+      final Thread reader = new Thread(() -> {
+        try {
+          readEntry.set(raftLog.get(newEntry.getIndex()));
+          Assertions.assertNull(readEntry.get());
+          // Unblock the writer thread.
+          readerCanProceed.countDown();
+        } catch (Throwable t) {
+          error.set(t);
+        }
+      });
+      reader.start();
+
+      // Wait for both the append and the read to complete.
+      reader.join();
+      appender.join();
+
+      // Check for errors.
+      if (error.get() != null) {
+        throw new Exception("Test failed", error.get());
+      }
+
+      // When the reader's get() call completed, the append was fully finished,
+      // so it should have returned the correct entry.
+      Assertions.assertEquals(newEntry.getIndex(), 
raftLog.getLastEntryTermIndex().getIndex());
+      readEntry.set(raftLog.get(newEntry.getIndex()));
+      Assertions.assertNotNull(readEntry.get());
+      Assertions.assertEquals(newEntry, readEntry.get());
+    } finally {
+      CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD);
+    }
+  }
 }

Reply via email to