This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e26e7c06e75 KAFKA-20091: Fix inconsistency in time-based retention 
checks between remote and local segment deletion logic. (#21352)
e26e7c06e75 is described below

commit e26e7c06e756e26022fb9f769f5f71fbc79c18b3
Author: Jian <[email protected]>
AuthorDate: Wed Jan 28 18:24:51 2026 +0800

    KAFKA-20091: Fix inconsistency in time-based retention checks between 
remote and local segment deletion logic. (#21352)
    
    When currentTime- largestTimestamp of segment = retentionMs, the
    behavior is different:
    
    1. Local Segment delete:  **Not delete**
    
    ```
    
org.apache.kafka.storage.internals.log.UnifiedLog#deleteRetentionMsBreachedSegments
    boolean delete = startMs - segment.largestTimestamp() > retentionMs;
    ```
    
    2. Remote segement delete:  **Delete**
    ```
    
org.apache.kafka.server.log.remote.storage.RemoteLogManager.RLMExpirationTask.RemoteLogRetentionHandler#isSegmentBreachedByRetentionTime
    shouldDeleteSegment = metadata.maxTimestampMs() <=
    retentionTimeData.get().cleanupUntilMs;
    
    //cleanupUntilMs is time.milliseconds() - retentionMs;
    private Optional<RetentionTimeData> buildRetentionTimeData(long
    retentionMs) {
        long cleanupUntilMs = time.milliseconds() - retentionMs;
        return retentionMs > -1 && cleanupUntilMs >= 0
                ? Optional.of(new RetentionTimeData(retentionMs,
    cleanupUntilMs))
                : Optional.empty();
    }
    ```
    
    cc @kamalcph
    
    Thanks for your comments on
    [KIP-1241](https://cwiki.apache.org/confluence/x/A4LMFw) .  While I
    thinking through one of the your comments and reading the code, I
    noticed a potential problem in this area, so I opened this PR. Many
    thanks.
    
    ---------
    
    Signed-off-by: stroller <[email protected]>
    
    Reviewers: Saket Ranjan <[email protected]>, Kamal Chandraprakash
     <[email protected]>
---
 .../log/remote/storage/RemoteLogManager.java       |  2 +-
 .../log/remote/storage/RemoteLogManagerTest.java   | 57 ++++++++++++++++------
 2 files changed, 42 insertions(+), 17 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 443f955a5fb..08c0f25b90c 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1207,7 +1207,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
                 if (retentionTimeData.isEmpty()) {
                     return shouldDeleteSegment;
                 }
-                shouldDeleteSegment = metadata.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs;
+                shouldDeleteSegment = metadata.maxTimestampMs() < 
retentionTimeData.get().cleanupUntilMs;
                 if (shouldDeleteSegment) {
                     remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
                     // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index c8efe9884d7..435c9958b5d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -2593,10 +2593,12 @@ public class RemoteLogManagerTest {
         verify(remoteStorageManager, 
times(1)).deleteLogSegmentData(remoteLogSegmentMetadata);
     }
 
-    @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
-    @CsvSource(value = {"0, -1", "-1, 0"})
+    // expectDeletion=false tests that segments with maxTimestampMs == 
cleanupUntilMs are NOT deleted (strict less-than)
+    @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1} expectDeletion={2}")
+    @CsvSource(value = {"0, -1, true", "-1, 0, true", "-1, 0, false"})
     public void testDeletionOnRetentionBreachedSegments(long retentionSize,
-                                                        long retentionMs)
+                                                        long retentionMs,
+                                                        boolean expectDeletion)
             throws RemoteStorageException, ExecutionException, 
InterruptedException {
         Map<String, Long> logProps = new HashMap<>();
         logProps.put("retention.bytes", retentionSize);
@@ -2630,19 +2632,27 @@ public class RemoteLogManagerTest {
 
 
         RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
+        if (expectDeletion) {
+            advanceTimeToMakeSegmentDeletable();
+        }
         task.cleanupExpiredRemoteLogSegments();
 
-        assertEquals(200L, currentLogStartOffset.get());
-        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
-        verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
-
-        // Verify the metric for remote delete is updated correctly
-        assertEquals(2, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
-        // Verify we did not report any failure for remote deletes
-        assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
-        // Verify aggregate metrics
-        assertEquals(2, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
-        assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+        if (expectDeletion) {
+            assertEquals(200L, currentLogStartOffset.get());
+            
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
+            
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
+
+            // Verify the metric for remote delete is updated correctly
+            assertEquals(2, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteDeleteRequestRate().count());
+            // Verify we did not report any failure for remote deletes
+            assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteDeleteRequestRate().count());
+            // Verify aggregate metrics
+            assertEquals(2, 
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
+            assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
+        } else {
+            assertEquals(0L, currentLogStartOffset.get());
+            verify(remoteStorageManager, never()).deleteLogSegmentData(any());
+        }
     }
 
     @ParameterizedTest(name = 
"testDeletionOnOverlappingRetentionBreachedSegments retentionSize={0} 
retentionMs={1}")
@@ -2698,6 +2708,7 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
 
         RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
+        advanceTimeToMakeSegmentDeletable();
         task.cleanupExpiredRemoteLogSegments();
 
         assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
@@ -2752,7 +2763,7 @@ public class RemoteLogManagerTest {
         RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
 
         verifyRemoteDeleteMetrics(0L, 0L);
-
+        advanceTimeToMakeSegmentDeletable();
         task.cleanupExpiredRemoteLogSegments();
 
         assertEquals(200L, currentLogStartOffset.get());
@@ -2884,6 +2895,7 @@ public class RemoteLogManagerTest {
                     });
                 });
 
+        advanceTimeToMakeSegmentDeletable();
         leaderTask.cleanupExpiredRemoteLogSegments();
 
         assertEquals(200L, currentLogStartOffset.get());
@@ -2981,6 +2993,7 @@ public class RemoteLogManagerTest {
 
         RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         doThrow(new RemoteStorageException("Failed to delete 
segment")).when(remoteStorageManager).deleteLogSegmentData(any());
+        advanceTimeToMakeSegmentDeletable();
         assertThrows(RemoteStorageException.class, 
task::cleanupExpiredRemoteLogSegments);
 
         assertEquals(100L, currentLogStartOffset.get());
@@ -3036,6 +3049,7 @@ public class RemoteLogManagerTest {
 
         RemoteLogManager.RLMExpirationTask task = remoteLogManager.new 
RLMExpirationTask(leaderTopicIdPartition);
         doThrow(new RetriableRemoteStorageException("Failed to delete segment 
with retriable 
exception")).when(remoteStorageManager).deleteLogSegmentData(any());
+        advanceTimeToMakeSegmentDeletable();
         assertThrows(RetriableRemoteStorageException.class, 
task::cleanupExpiredRemoteLogSegments);
 
         assertEquals(100L, currentLogStartOffset.get());
@@ -3129,6 +3143,15 @@ public class RemoteLogManagerTest {
         verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount);
     }
 
+    /**
+     * Segments created with current time won't be deleted immediately since
+     * retention check uses {@code maxTimestampMs < cleanupUntilMs} (not 
{@code <=}).
+     * Advance time by 1ms to make segments eligible for deletion.
+     */
+    private void advanceTimeToMakeSegmentDeletable() {
+        ((MockTime) time).sleep(1);
+    }
+
     private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long 
remoteDeleteLagSegments) {
         assertEquals(remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
                 String.format("Expected to find %d for RemoteDeleteLagBytes 
metric value, but found %d",
@@ -3256,7 +3279,9 @@ public class RemoteLogManagerTest {
         for (int idx = 0; idx < segmentCount; idx++) {
             long timestamp = time.milliseconds();
             if (idx < deletableSegmentCount) {
-                timestamp = time.milliseconds() - 1;
+                // Use -2 instead of -1 because some test cases use 
retentionMs=1.
+                // With -1, segment's maxTimestampMs == cleanupUntilMs, so the 
segment won't be deleted.
+                timestamp = time.milliseconds() - 2;
             }
             long startOffset = (long) idx * recordsPerSegment;
             long endOffset = startOffset + recordsPerSegment - 1;

Reply via email to