This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 9fcfe546d13 KAFKA-19407 Fix potential IllegalStateException when
appending to timeIndex (#19972)
9fcfe546d13 is described below
commit 9fcfe546d1313b77ad5dc6b10be3095fe6d32569
Author: Okada Haruki <[email protected]>
AuthorDate: Wed Jun 25 01:35:53 2025 +0900
KAFKA-19407 Fix potential IllegalStateException when appending to timeIndex
(#19972)
## Summary
- Fix potential race condition in
LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
non-monotonic offsets and causes replication to stop.
- See https://issues.apache.org/jira/browse/KAFKA-19407 for the details
how it happen.
Reviewers: Vincent PÉRICART <[email protected]>, Jun Rao
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/storage/internals/log/LogSegment.java | 18 +++++++--
.../storage/internals/log/LogSegmentTest.java | 45 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 3 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index b388af1f798..e07d8a2d6a6 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -93,6 +93,9 @@ public class LogSegment implements Closeable {
// NOTED: the offset is the last offset of batch having the max timestamp.
private volatile TimestampOffset maxTimestampAndOffsetSoFar =
TimestampOffset.UNKNOWN;
+ // Lock for maxTimestampAndOffsetSoFar to ensure that it will be
initialized only once
+ private final Object maxTimestampAndOffsetLock = new Object();
+
private long created;
/* the number of bytes since we last added an entry in the offset index */
@@ -177,7 +180,7 @@ public class LogSegment implements Closeable {
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws
IOException {
if (offsetIndexFile().exists()) {
// Resize the time index file to 0 if it is newly created.
- if (timeIndexFileNewlyCreated)
+ if (timeIndexFileNewlyCreated)
timeIndex().resize(0);
// Sanity checks for time index and offset index are skipped
because
// we will recover the segments above the recovery point in
recoverLog()
@@ -192,8 +195,17 @@ public class LogSegment implements Closeable {
* the time index).
*/
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException
{
- if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
- maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
+ if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
+ // As stated in LogSegment class javadoc, this class is not
thread-safe so basically we assume that
+ // methods are called within UnifiedLog#lock.
+ // However, there's exceptional paths where this method can be
called outside of the lock,
+ // so we need lock here to prevent multiple threads trying to
modify maxTimestampAndOffsetSoFar
+ synchronized (maxTimestampAndOffsetLock) {
+ if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
+ maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
+ }
+ }
+ }
return maxTimestampAndOffsetSoFar;
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index b798378f1af..64064e2bee5 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -59,6 +60,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -67,7 +72,11 @@ import static
org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class LogSegmentTest {
@@ -859,6 +868,42 @@ public class LogSegmentTest {
assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
}
+ @Test
+ @Timeout(30)
+ public void testConcurrentAccessToMaxTimestampSoFar() throws Exception {
+ int numThreads = 16;
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ TimeIndex mockTimeIndex = mock(TimeIndex.class);
+ when(mockTimeIndex.lastEntry()).thenReturn(new
TimestampOffset(RecordBatch.NO_TIMESTAMP, 0L));
+
+ try {
+ // to reproduce race, we iterate test for certain duration
+ long remainingDurationNanos = Duration.ofSeconds(1).toNanos();
+ while (remainingDurationNanos > 0) {
+ long t0 = System.nanoTime();
+ clearInvocations(mockTimeIndex);
+ try (LogSegment seg = spy(LogTestUtils.createSegment(0,
logDir, 10, Time.SYSTEM))) {
+ when(seg.timeIndex()).thenReturn(mockTimeIndex);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(executor.submit(() ->
assertDoesNotThrow(seg::maxTimestampSoFar)));
+ }
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ // timeIndex.lastEntry should be called once if no race
+ verify(mockTimeIndex, times(1)).lastEntry();
+
+ long elapsedNanos = System.nanoTime() - t0;
+ remainingDurationNanos -= elapsedNanos;
+ }
+ }
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+ }
+
private ProducerStateManager newProducerStateManager() throws IOException {
return new ProducerStateManager(
topicPartition,