This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f50a17fa8d2 KAFKA-18606: Flaky test
DeleteSegmentsByRetentionTimeTest#executeTieredStorageTest (#18861)
f50a17fa8d2 is described below
commit f50a17fa8d2dbecc69891e2310763d64fe97175b
Author: Ken Huang <[email protected]>
AuthorDate: Fri Mar 14 23:10:36 2025 +0800
KAFKA-18606: Flaky test
DeleteSegmentsByRetentionTimeTest#executeTieredStorageTest (#18861)
Jira: https://issues.apache.org/jira/browse/KAFKA-18606
This flaky test is caused by
https://github.com/apache/kafka/commit/23c459286bcf6c5a93fb629fd15f6398aa3e99ed,
it modify the default `LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT` from
`Long.MAX_VALUE` to `3600000`(1 hour) and remove the remove the produce
record timestamp
https://github.com/apache/kafka/commit/23c459286bcf6c5a93fb629fd15f6398aa3e99ed#diff-d157da9c40cb386be02d1f917db9e5f6293cdbc82e45a39115bf8629fc19d55cL59.
This test case is testing that data handling after the retention period
(1ms) has expired then delete of related segmant. The old test
https://github.com/apache/kafka/pull/16932 add
`TimeUnit.DAYS.toMillis(1))` makes the third record expire, thus this
test is flaky a lot due to this record.
Reviewers: Jun Rao <[email protected]>
---
.../tiered/storage/actions/ConsumeAction.java | 44 ++++++++++++++++++----
.../integration/BaseDeleteSegmentsTest.java | 5 ++-
2 files changed, 40 insertions(+), 9 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index df8f255830a..593a618c3f5 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -171,20 +171,48 @@ public final class ConsumeAction implements
TieredStorageTestAction {
expectedCountAndOp = new
RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
}
- String message = String.format("Number of %s requests from broker
%d to the tier storage does not match the expected value for topic-partition
%s",
- eventType, remoteFetchSpec.getSourceBrokerId(),
remoteFetchSpec.getTopicPartition());
- if (expectedCountAndOp.getCount() != -1) {
- if (expectedCountAndOp.getOperationType() ==
RemoteFetchCount.OperationType.EQUALS_TO) {
- assertEquals(expectedCountAndOp.getCount(),
eventsInScope.size(), message);
- } else if (expectedCountAndOp.getOperationType() ==
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
- assertTrue(eventsInScope.size() <=
expectedCountAndOp.getCount(), message);
+ RemoteFetchCount.OperationType exceptedOperationType =
expectedCountAndOp.getOperationType();
+ int exceptedCount = expectedCountAndOp.getCount();
+ int actualCount = eventsInScope.size();
+ String message = errorMessage(eventType, actualCount,
exceptedOperationType, exceptedCount);
+ if (exceptedCount != -1) {
+ if (exceptedOperationType ==
RemoteFetchCount.OperationType.EQUALS_TO) {
+ assertEquals(exceptedCount, actualCount, message);
+ } else if (exceptedOperationType ==
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
+ assertTrue(actualCount <= exceptedCount, message);
} else {
- assertTrue(eventsInScope.size() >=
expectedCountAndOp.getCount(), message);
+ assertTrue(actualCount >= exceptedCount, message);
}
}
}
}
+ private String errorMessage(
+ LocalTieredStorageEvent.EventType eventType,
+ int actualCount,
+ RemoteFetchCount.OperationType exceptedOperationType,
+ int exceptedCount
+ ) {
+ return String.format(
+ "Expected %s requests count from broker %d to tiered storage for
topic-partition %s to be %s %d, " +
+ "but actual count was %d.",
+ eventType,
+ remoteFetchSpec.getSourceBrokerId(),
+ remoteFetchSpec.getTopicPartition(),
+ operationTypeToString(exceptedOperationType),
+ exceptedCount,
+ actualCount
+ );
+ }
+
+ private String operationTypeToString(RemoteFetchCount.OperationType
operationType) {
+ return switch (operationType) {
+ case EQUALS_TO -> "equal to";
+ case LESS_THAN_OR_EQUALS_TO -> "less than or equal to";
+ case GREATER_THAN_OR_EQUALS_TO -> "greater than or equal to";
+ };
+ }
+
@Override
public void describe(PrintStream output) {
output.println("consume-action:");
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index d94bb571cbe..cac1abe0aba 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@@ -55,7 +56,9 @@ public abstract class BaseDeleteSegmentsTest extends
TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0",
"v0"), new KeyValueSpec("k1", "v1"),
- new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3", System.currentTimeMillis()))
+ // DeleteSegmentsByRetentionTimeTest uses a tiny
retention time, which could cause the active
+ // segment to be rolled and deleted. We use a future
timestamp to prevent that from happening.
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3", System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))
// update the topic config such that it triggers the deletion
of segments
.updateTopicConfig(topicA, configsToBeAdded(),
Collections.emptyList())
// expect that the three offloaded remote log segments are
deleted