This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 0d553cc9c66 KAFKA-15499: Fix the flaky 
DeleteSegmentsDueToLogStartOffsetBreach test (#14439)
0d553cc9c66 is described below

commit 0d553cc9c66d9f9f365b72bd384f1dd2997216f0
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Sep 28 18:34:37 2023 +0530

    KAFKA-15499: Fix the flaky DeleteSegmentsDueToLogStartOffsetBreach test 
(#14439)
    
    DeleteSegmentsDueToLogStartOffsetBreach configures the segment such that it 
can hold at-most 2 record-batches. And, it asserts that the 
local-log-start-offset based on the assumption that each segment will contain 
exactly two messages.
    
    During leader switch, the segment can get rotated and may not always 
contain two records. Previously, we were checking whether the expected 
local-log-start-offset is equal to the 
base-offset-of-the-first-local-log-segment. With this patch, we will scan the 
first local-log-segment for the expected offset.
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../log/remote/storage/LocalTieredStorageTest.java | 11 +----
 .../tiered/storage/utils/BrokerLocalStorage.java   | 51 +++++++++++++++++++---
 2 files changed, 45 insertions(+), 17 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
index 00ac899c349..b3fa277a284 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java
@@ -43,7 +43,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
-import java.text.NumberFormat;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -595,14 +594,6 @@ public final class LocalTieredStorageTest {
         private static final byte[] PRODUCER_SNAPSHOT_FILE_BYTES = 
"pid".getBytes();
         private static final byte[] LEADER_EPOCH_CHECKPOINT_FILE_BYTES = 
"0\n2\n0 0\n2 12".getBytes();
 
-        private static final NumberFormat OFFSET_FORMAT = 
NumberFormat.getInstance();
-
-        static {
-            OFFSET_FORMAT.setMaximumIntegerDigits(20);
-            OFFSET_FORMAT.setMaximumFractionDigits(0);
-            OFFSET_FORMAT.setGroupingUsed(false);
-        }
-
         private final Path segmentPath = Paths.get("local-segments");
         private long baseOffset = 0;
 
@@ -621,7 +612,7 @@ public final class LocalTieredStorageTest {
         }
 
         LogSegmentData nextSegment(final byte[]... data) {
-            final String offset = OFFSET_FORMAT.format(baseOffset);
+            final String offset = 
LogFileUtils.filenamePrefixFromOffset(baseOffset);
 
             try {
                 final FileChannel channel = FileChannel.open(
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 7197f3a9dff..46cb4b05b68 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.tiered.storage.utils;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
@@ -67,12 +69,14 @@ public final class BrokerLocalStorage {
     public void waitForEarliestLocalOffset(TopicPartition topicPartition,
                                            Long offset) {
         Function<OffsetHolder, Optional<String>> relativePosFunc = 
offsetHolder -> {
-            if (offsetHolder.firstLogFileBaseOffset < offset) {
-                return Optional.of("smaller than");
+            Optional<String> result = Optional.empty();
+            if (offsetHolder.firstLogFileBaseOffset < offset &&
+                    !isOffsetPresentInFirstLocalSegment(topicPartition, 
offsetHolder.firstLogFileBaseOffset, offset)) {
+                result = Optional.of("smaller than");
             } else if (offsetHolder.firstLogFileBaseOffset > offset) {
-                return Optional.of("ahead of");
+                result = Optional.of("ahead of");
             }
-            return Optional.empty();
+            return result;
         };
         waitForOffset(topicPartition, offset, relativePosFunc);
     }
@@ -90,10 +94,12 @@ public final class BrokerLocalStorage {
     public void waitForAtLeastEarliestLocalOffset(TopicPartition 
topicPartition,
                                                   Long offset) {
         Function<OffsetHolder, Optional<String>> relativePosFunc = 
offsetHolder -> {
-            if (offsetHolder.firstLogFileBaseOffset < offset) {
-                return Optional.of("smaller than");
+            Optional<String> result = Optional.empty();
+            if (offsetHolder.firstLogFileBaseOffset < offset &&
+                    !isOffsetPresentInFirstLocalSegment(topicPartition, 
offsetHolder.firstLogFileBaseOffset, offset)) {
+                result = Optional.of("smaller than");
             }
-            return Optional.empty();
+            return result;
         };
         waitForOffset(topicPartition, offset, relativePosFunc);
     }
@@ -119,6 +125,37 @@ public final class BrokerLocalStorage {
         }
     }
 
+    /**
+     * Check if the given offset is present in the first local segment of the 
given topic-partition.
+     * @param topicPartition The topic-partition to check.
+     * @param firstLogFileBaseOffset The base offset of the first local 
segment.
+     * @param offsetToSearch The offset to search.
+     * @return true if the offset is present in the first local segment, false 
otherwise.
+     */
+    private boolean isOffsetPresentInFirstLocalSegment(TopicPartition 
topicPartition,
+                                                       Long 
firstLogFileBaseOffset,
+                                                       Long offsetToSearch)  {
+        if (offsetToSearch < firstLogFileBaseOffset) {
+            return false;
+        }
+        if (offsetToSearch.equals(firstLogFileBaseOffset)) {
+            return true;
+        }
+        File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), 
topicPartition.toString());
+        File firstSegmentFile = new File(partitionDir.getAbsolutePath(),
+                LogFileUtils.filenamePrefixFromOffset(firstLogFileBaseOffset) 
+ LogFileUtils.LOG_FILE_SUFFIX);
+        try (FileRecords fileRecords = FileRecords.open(firstSegmentFile, 
false)) {
+            for (FileLogInputStream.FileChannelRecordBatch batch : 
fileRecords.batches()) {
+                if (batch.baseOffset() <= offsetToSearch && batch.lastOffset() 
>= offsetToSearch) {
+                    return true;
+                }
+            }
+        } catch (final IOException ex) {
+            return false;
+        }
+        return false;
+    }
+
     public void eraseStorage() throws IOException {
         for (File file : 
Objects.requireNonNull(brokerStorageDirectory.listFiles())) {
             Utils.delete(file);

Reply via email to