This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 9fcfe546d13 KAFKA-19407 Fix potential IllegalStateException when 
appending to timeIndex (#19972)
9fcfe546d13 is described below

commit 9fcfe546d1313b77ad5dc6b10be3095fe6d32569
Author: Okada Haruki <[email protected]>
AuthorDate: Wed Jun 25 01:35:53 2025 +0900

    KAFKA-19407 Fix potential IllegalStateException when appending to timeIndex 
(#19972)
    
    ## Summary
    - Fix potential race condition in
    LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
    non-monotonic offsets and causes replication to stop.
    - See https://issues.apache.org/jira/browse/KAFKA-19407 for the details
    how it happen.
    
    Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/storage/internals/log/LogSegment.java    | 18 +++++++--
 .../storage/internals/log/LogSegmentTest.java      | 45 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index b388af1f798..e07d8a2d6a6 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -93,6 +93,9 @@ public class LogSegment implements Closeable {
     // NOTED: the offset is the last offset of batch having the max timestamp.
     private volatile TimestampOffset maxTimestampAndOffsetSoFar = 
TimestampOffset.UNKNOWN;
 
+    // Lock for maxTimestampAndOffsetSoFar to ensure that it will be 
initialized only once
+    private final Object maxTimestampAndOffsetLock = new Object();
+
     private long created;
 
     /* the number of bytes since we last added an entry in the offset index */
@@ -177,7 +180,7 @@ public class LogSegment implements Closeable {
     public void sanityCheck(boolean timeIndexFileNewlyCreated) throws 
IOException {
         if (offsetIndexFile().exists()) {
             // Resize the time index file to 0 if it is newly created.
-            if (timeIndexFileNewlyCreated) 
+            if (timeIndexFileNewlyCreated)
                 timeIndex().resize(0);
             // Sanity checks for time index and offset index are skipped 
because
             // we will recover the segments above the recovery point in 
recoverLog()
@@ -192,8 +195,17 @@ public class LogSegment implements Closeable {
      * the time index).
      */
     public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException 
{
-        if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
-            maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
+        if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
+            // As stated in LogSegment class javadoc, this class is not 
thread-safe so basically we assume that
+            // methods are called within UnifiedLog#lock.
+            // However, there's exceptional paths where this method can be 
called outside of the lock,
+            // so we need lock here to prevent multiple threads trying to 
modify maxTimestampAndOffsetSoFar
+            synchronized (maxTimestampAndOffsetLock) {
+                if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
+                    maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
+                }
+            }
+        }
         return maxTimestampAndOffsetSoFar;
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index b798378f1af..64064e2bee5 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -59,6 +60,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -67,7 +72,11 @@ import static 
org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class LogSegmentTest {
@@ -859,6 +868,42 @@ public class LogSegmentTest {
         assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
     }
 
+    @Test
+    @Timeout(30)
+    public void testConcurrentAccessToMaxTimestampSoFar() throws Exception {
+        int numThreads = 16;
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        TimeIndex mockTimeIndex = mock(TimeIndex.class);
+        when(mockTimeIndex.lastEntry()).thenReturn(new 
TimestampOffset(RecordBatch.NO_TIMESTAMP, 0L));
+
+        try {
+            // to reproduce race, we iterate test for certain duration
+            long remainingDurationNanos = Duration.ofSeconds(1).toNanos();
+            while (remainingDurationNanos > 0) {
+                long t0 = System.nanoTime();
+                clearInvocations(mockTimeIndex);
+                try (LogSegment seg = spy(LogTestUtils.createSegment(0, 
logDir, 10, Time.SYSTEM))) {
+                    when(seg.timeIndex()).thenReturn(mockTimeIndex);
+                    List<Future<?>> futures = new ArrayList<>();
+                    for (int i = 0; i < numThreads; i++) {
+                        futures.add(executor.submit(() -> 
assertDoesNotThrow(seg::maxTimestampSoFar)));
+                    }
+                    for (Future<?> future : futures) {
+                        future.get();
+                    }
+                    // timeIndex.lastEntry should be called once if no race
+                    verify(mockTimeIndex, times(1)).lastEntry();
+
+                    long elapsedNanos = System.nanoTime() - t0;
+                    remainingDurationNanos -= elapsedNanos;
+                }
+            }
+        } finally {
+            executor.shutdown();
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+    }
+
     private ProducerStateManager newProducerStateManager() throws IOException {
         return new ProducerStateManager(
             topicPartition,

Reply via email to