This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f50a17fa8d2 KAFKA-18606: Flaky test 
DeleteSegmentsByRetentionTimeTest#executeTieredStorageTest (#18861)
f50a17fa8d2 is described below

commit f50a17fa8d2dbecc69891e2310763d64fe97175b
Author: Ken Huang <[email protected]>
AuthorDate: Fri Mar 14 23:10:36 2025 +0800

    KAFKA-18606: Flaky test 
DeleteSegmentsByRetentionTimeTest#executeTieredStorageTest (#18861)
    
    Jira: https://issues.apache.org/jira/browse/KAFKA-18606
    
    This flaky test is caused by
    
https://github.com/apache/kafka/commit/23c459286bcf6c5a93fb629fd15f6398aa3e99ed,
    it modify the default `LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT` from
    `Long.MAX_VALUE` to `3600000`(1 hour) and remove the remove the produce
    record timestamp
    
https://github.com/apache/kafka/commit/23c459286bcf6c5a93fb629fd15f6398aa3e99ed#diff-d157da9c40cb386be02d1f917db9e5f6293cdbc82e45a39115bf8629fc19d55cL59.
    This test case is testing that data handling after the retention period
    (1ms) has expired then delete of related segmant. The old test
    https://github.com/apache/kafka/pull/16932 add
    `TimeUnit.DAYS.toMillis(1))` makes the third record expire, thus this
    test is flaky a lot due to this record.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../tiered/storage/actions/ConsumeAction.java      | 44 ++++++++++++++++++----
 .../integration/BaseDeleteSegmentsTest.java        |  5 ++-
 2 files changed, 40 insertions(+), 9 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index df8f255830a..593a618c3f5 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -171,20 +171,48 @@ public final class ConsumeAction implements 
TieredStorageTestAction {
                     expectedCountAndOp = new 
RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
             }
 
-            String message = String.format("Number of %s requests from broker 
%d to the tier storage does not match the expected value for topic-partition 
%s",
-                    eventType, remoteFetchSpec.getSourceBrokerId(), 
remoteFetchSpec.getTopicPartition());
-            if (expectedCountAndOp.getCount() != -1) {
-                if (expectedCountAndOp.getOperationType() == 
RemoteFetchCount.OperationType.EQUALS_TO) {
-                    assertEquals(expectedCountAndOp.getCount(), 
eventsInScope.size(), message);
-                } else if (expectedCountAndOp.getOperationType() == 
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
-                    assertTrue(eventsInScope.size() <= 
expectedCountAndOp.getCount(), message);
+            RemoteFetchCount.OperationType exceptedOperationType = 
expectedCountAndOp.getOperationType();
+            int exceptedCount = expectedCountAndOp.getCount();
+            int actualCount = eventsInScope.size();
+            String message = errorMessage(eventType, actualCount, 
exceptedOperationType, exceptedCount);
+            if (exceptedCount != -1) {
+                if (exceptedOperationType == 
RemoteFetchCount.OperationType.EQUALS_TO) {
+                    assertEquals(exceptedCount, actualCount, message);
+                } else if (exceptedOperationType == 
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
+                    assertTrue(actualCount <= exceptedCount, message);
                 } else {
-                    assertTrue(eventsInScope.size() >= 
expectedCountAndOp.getCount(), message);
+                    assertTrue(actualCount >= exceptedCount, message);
                 }
             }
         }
     }
 
+    private String errorMessage(
+        LocalTieredStorageEvent.EventType eventType,
+        int actualCount,
+        RemoteFetchCount.OperationType exceptedOperationType,
+        int exceptedCount
+    ) {
+        return String.format(
+            "Expected %s requests count from broker %d to tiered storage for 
topic-partition %s to be %s %d, " +
+                    "but actual count was %d.",
+            eventType,
+            remoteFetchSpec.getSourceBrokerId(),
+            remoteFetchSpec.getTopicPartition(),
+            operationTypeToString(exceptedOperationType),
+            exceptedCount,
+            actualCount
+        );
+    }
+
+    private String operationTypeToString(RemoteFetchCount.OperationType 
operationType) {
+        return switch (operationType) {
+            case EQUALS_TO -> "equal to";
+            case LESS_THAN_OR_EQUALS_TO -> "less than or equal to";
+            case GREATER_THAN_OR_EQUALS_TO -> "greater than or equal to";
+        };
+    }
+
     @Override
     public void describe(PrintStream output) {
         output.println("consume-action:");
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index d94bb571cbe..cac1abe0aba 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
 
@@ -55,7 +56,9 @@ public abstract class BaseDeleteSegmentsTest extends 
TieredStorageTestHarness {
                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
                 .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", 
"v0"), new KeyValueSpec("k1", "v1"),
-                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis()))
+                        // DeleteSegmentsByRetentionTimeTest uses a tiny 
retention time, which could cause the active 
+                        // segment to be rolled and deleted. We use a future 
timestamp to prevent that from happening.
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
                 // update the topic config such that it triggers the deletion 
of segments
                 .updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
                 // expect that the three offloaded remote log segments are 
deleted

Reply via email to