This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b06970917 RATIS-2434. Regression test for data race between
SegmentedRaftLog.get() and LogSegment.append(). (#1378)
b06970917 is described below
commit b069709177bfd0d681cf7d47097c13668ba12b97
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Fri Mar 13 19:39:29 2026 -0700
RATIS-2434. Regression test for data race between SegmentedRaftLog.get()
and LogSegment.append(). (#1378)
---
.../ratis/server/raftlog/segmented/LogSegment.java | 7 +++
.../raftlog/segmented/TestSegmentedRaftLog.java | 73 ++++++++++++++++++++++
2 files changed, 80 insertions(+)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c40b91f70..e9cb2e50f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import org.apache.ratis.util.CodeInjectionForTesting;
/**
@@ -357,6 +358,7 @@ public final class LogSegment {
append(true, entry, op);
}
+ public static final String APPEND_RECORD = LogSegment.class.getSimpleName()
+ ".append";
private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
Objects.requireNonNull(entry, "entry == null");
final LogRecord currentLast = records.getLast();
@@ -371,9 +373,14 @@ public final class LogSegment {
final LogRecord record = new LogRecord(totalFileSize, entry);
if (keepEntryInCache) {
+ // It is important to put the entry into the cache before appending the
+ // record to the record list. Otherwise, a reader thread may get the
+ // record from the list but not the entry from the cache.
putEntryCache(record.getTermIndex(), entry, op);
+ CodeInjectionForTesting.execute(APPEND_RECORD, this,
record.getTermIndex());
}
records.append(record);
+
totalFileSize += getEntrySize(entry, op);
endIndex = entry.getIndex();
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index a772b0002..43aafc896 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -75,6 +76,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import static java.lang.Boolean.FALSE;
@@ -873,4 +876,74 @@ public class TestSegmentedRaftLog extends BaseTest {
" ns with asyncFlush " + useAsyncFlush);
}
}
+
+ public static final Logger LOG =
LoggerFactory.getLogger(TestSegmentedRaftLog.class);
+ @Test
+ public void testConcurrentGetDuringAppend() throws Exception {
+ RaftServerConfigKeys.Log.setReadLockEnabled(properties, false);
+ final CountDownLatch injectionPaused = new CountDownLatch(1);
+ final CountDownLatch readerCanProceed = new CountDownLatch(1);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final AtomicReference<LogEntryProto> readEntry = new AtomicReference<>();
+
+ final CodeInjectionForTesting.Code code = (localId, remoteId, args) -> {
+ // in log worker thread, holding write lock
+ injectionPaused.countDown();
+ try {
+ if (!readerCanProceed.await(5, TimeUnit.SECONDS)) {
+ error.set(new TimeoutException("The reader thread did not start in
time."));
+ }
+ } catch (InterruptedException e) {
+ error.set(e);
+ }
+ return true;
+ };
+
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
+ CodeInjectionForTesting.put(LogSegment.APPEND_RECORD, code);
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ final LogEntryProto newEntry = prepareLogEntry(1, 0, () -> "newEntry",
false);
+
+ // Run appendEntry asynchronously.
+ final Thread appender = new Thread(() -> {
+ raftLog.appendEntry(newEntry).join();
+ });
+ appender.start();
+
+ // Wait until the append operation is paused at the injection point.
+ Assertions.assertTrue(injectionPaused.await(5, TimeUnit.SECONDS),
"Injection point was not hit.");
+
+ // Start a new reader thread to call get().
+ // This thread will block until the write lock is released.
+ final Thread reader = new Thread(() -> {
+ try {
+ readEntry.set(raftLog.get(newEntry.getIndex()));
+ Assertions.assertNull(readEntry.get());
+ // Unblock the writer thread.
+ readerCanProceed.countDown();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ });
+ reader.start();
+
+ // Wait for both the append and the read to complete.
+ reader.join();
+ appender.join();
+
+ // Check for errors.
+ if (error.get() != null) {
+ throw new Exception("Test failed", error.get());
+ }
+
+ // When the reader's get() call completed, the append was fully finished,
+ // so it should have returned the correct entry.
+ Assertions.assertEquals(newEntry.getIndex(),
raftLog.getLastEntryTermIndex().getIndex());
+ readEntry.set(raftLog.get(newEntry.getIndex()));
+ Assertions.assertNotNull(readEntry.get());
+ Assertions.assertEquals(newEntry, readEntry.get());
+ } finally {
+ CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD);
+ }
+ }
}