This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 0d553cc9c66 KAFKA-15499: Fix the flaky
DeleteSegmentsDueToLogStartOffsetBreach test (#14439)
0d553cc9c66 is described below
commit 0d553cc9c66d9f9f365b72bd384f1dd2997216f0
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Sep 28 18:34:37 2023 +0530
KAFKA-15499: Fix the flaky DeleteSegmentsDueToLogStartOffsetBreach test
(#14439)
DeleteSegmentsDueToLogStartOffsetBreach configures the segment such that it
can hold at-most 2 record-batches. And, it asserts that the
local-log-start-offset based on the assumption that each segment will contain
exactly two messages.
During leader switch, the segment can get rotated and may not always
contain two records. Previously, we were checking whether the expected
local-log-start-offset is equal to the
base-offset-of-the-first-local-log-segment. With this patch, we will scan the
first local-log-segment for the expected offset.
Reviewers: Divij Vaidya <[email protected]>
---
.../log/remote/storage/LocalTieredStorageTest.java | 11 +----
.../tiered/storage/utils/BrokerLocalStorage.java | 51 +++++++++++++++++++---
2 files changed, 45 insertions(+), 17 deletions(-)
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 00ac899c349..b3fa277a284 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -43,7 +43,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
-import java.text.NumberFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@@ -595,14 +594,6 @@ public final class LocalTieredStorageTest {
private static final byte[] PRODUCER_SNAPSHOT_FILE_BYTES =
"pid".getBytes();
private static final byte[] LEADER_EPOCH_CHECKPOINT_FILE_BYTES =
"0\n2\n0 0\n2 12".getBytes();
- private static final NumberFormat OFFSET_FORMAT =
NumberFormat.getInstance();
-
- static {
- OFFSET_FORMAT.setMaximumIntegerDigits(20);
- OFFSET_FORMAT.setMaximumFractionDigits(0);
- OFFSET_FORMAT.setGroupingUsed(false);
- }
-
private final Path segmentPath = Paths.get("local-segments");
private long baseOffset = 0;
@@ -621,7 +612,7 @@ public final class LocalTieredStorageTest {
}
LogSegmentData nextSegment(final byte[]... data) {
- final String offset = OFFSET_FORMAT.format(baseOffset);
+ final String offset =
LogFileUtils.filenamePrefixFromOffset(baseOffset);
try {
final FileChannel channel = FileChannel.open(
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 7197f3a9dff..46cb4b05b68 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -17,6 +17,8 @@
package org.apache.kafka.tiered.storage.utils;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
@@ -67,12 +69,14 @@ public final class BrokerLocalStorage {
public void waitForEarliestLocalOffset(TopicPartition topicPartition,
Long offset) {
Function<OffsetHolder, Optional<String>> relativePosFunc =
offsetHolder -> {
- if (offsetHolder.firstLogFileBaseOffset < offset) {
- return Optional.of("smaller than");
+ Optional<String> result = Optional.empty();
+ if (offsetHolder.firstLogFileBaseOffset < offset &&
+ !isOffsetPresentInFirstLocalSegment(topicPartition,
offsetHolder.firstLogFileBaseOffset, offset)) {
+ result = Optional.of("smaller than");
} else if (offsetHolder.firstLogFileBaseOffset > offset) {
- return Optional.of("ahead of");
+ result = Optional.of("ahead of");
}
- return Optional.empty();
+ return result;
};
waitForOffset(topicPartition, offset, relativePosFunc);
}
@@ -90,10 +94,12 @@ public final class BrokerLocalStorage {
public void waitForAtLeastEarliestLocalOffset(TopicPartition
topicPartition,
Long offset) {
Function<OffsetHolder, Optional<String>> relativePosFunc =
offsetHolder -> {
- if (offsetHolder.firstLogFileBaseOffset < offset) {
- return Optional.of("smaller than");
+ Optional<String> result = Optional.empty();
+ if (offsetHolder.firstLogFileBaseOffset < offset &&
+ !isOffsetPresentInFirstLocalSegment(topicPartition,
offsetHolder.firstLogFileBaseOffset, offset)) {
+ result = Optional.of("smaller than");
}
- return Optional.empty();
+ return result;
};
waitForOffset(topicPartition, offset, relativePosFunc);
}
@@ -119,6 +125,37 @@ public final class BrokerLocalStorage {
}
}
+ /**
+ * Check if the given offset is present in the first local segment of the
given topic-partition.
+ * @param topicPartition The topic-partition to check.
+ * @param firstLogFileBaseOffset The base offset of the first local
segment.
+ * @param offsetToSearch The offset to search.
+ * @return true if the offset is present in the first local segment, false
otherwise.
+ */
+ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition
topicPartition,
+ Long
firstLogFileBaseOffset,
+ Long offsetToSearch) {
+ if (offsetToSearch < firstLogFileBaseOffset) {
+ return false;
+ }
+ if (offsetToSearch.equals(firstLogFileBaseOffset)) {
+ return true;
+ }
+ File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(),
topicPartition.toString());
+ File firstSegmentFile = new File(partitionDir.getAbsolutePath(),
+ LogFileUtils.filenamePrefixFromOffset(firstLogFileBaseOffset)
+ LogFileUtils.LOG_FILE_SUFFIX);
+ try (FileRecords fileRecords = FileRecords.open(firstSegmentFile,
false)) {
+ for (FileLogInputStream.FileChannelRecordBatch batch :
fileRecords.batches()) {
+ if (batch.baseOffset() <= offsetToSearch && batch.lastOffset()
>= offsetToSearch) {
+ return true;
+ }
+ }
+ } catch (final IOException ex) {
+ return false;
+ }
+ return false;
+ }
+
public void eraseStorage() throws IOException {
for (File file :
Objects.requireNonNull(brokerStorageDirectory.listFiles())) {
Utils.delete(file);