This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 6ea2ed3db05 KAFKA-17801: RemoteLogManager may compute inaccurate 
upperBoundOffset for aborted txns (#17676) (#1 7733)
6ea2ed3db05 is described below

commit 6ea2ed3db050b89b1a491329a55f6a66a035d53c
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Sun Nov 10 22:54:26 2024 +0530

    KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for 
aborted txns (#17676) (#1 7733)
    
    Reviewers: Jun Rao <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 39 ++++++++---
 .../kafka/log/remote/RemoteLogManagerTest.java     | 79 ++++++++++++++++++++--
 2 files changed, 103 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6f4f5daa9c0..577f9b080de 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1610,25 +1610,26 @@ public class RemoteLogManager implements Closeable {
         }
 
         RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        EnrichedRecordBatch enrichedRecordBatch = new 
EnrichedRecordBatch(null, 0);
         InputStream remoteSegInputStream = null;
         try {
             int startPos = 0;
-            RecordBatch firstBatch = null;
-
             //  Iteration over multiple RemoteSegmentMetadata is required in 
case of log compaction.
             //  It may be possible the offset is log compacted in the current 
RemoteLogSegmentMetadata
             //  And we need to iterate over the next segment metadata to fetch 
messages higher than the given offset.
-            while (firstBatch == null && rlsMetadataOptional.isPresent()) {
+            while (enrichedRecordBatch.batch == null && 
rlsMetadataOptional.isPresent()) {
                 remoteLogSegmentMetadata = rlsMetadataOptional.get();
                 // Search forward for the position of the last offset that is 
greater than or equal to the target offset
                 startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
                 remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
                 RemoteLogInputStream remoteLogInputStream = 
getRemoteLogInputStream(remoteSegInputStream);
-                firstBatch = findFirstBatch(remoteLogInputStream, offset);
-                if (firstBatch == null) {
+                enrichedRecordBatch = findFirstBatch(remoteLogInputStream, 
offset);
+                if (enrichedRecordBatch.batch == null) {
+                    Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
                     rlsMetadataOptional = 
findNextSegmentMetadata(rlsMetadataOptional.get(), 
logOptional.get().leaderEpochCache());
                 }
             }
+            RecordBatch firstBatch = enrichedRecordBatch.batch;
             if (firstBatch == null)
                 return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
                         includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
@@ -1659,8 +1660,9 @@ public class RemoteLogManager implements Closeable {
             }
             buffer.flip();
 
+            startPos = startPos + enrichedRecordBatch.skippedBytes;
             FetchDataInfo fetchDataInfo = new FetchDataInfo(
-                    new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+                    new LogOffsetMetadata(firstBatch.baseOffset(), 
remoteLogSegmentMetadata.startOffset(), startPos),
                     MemoryRecords.readableRecords(buffer));
             if (includeAbortedTxns) {
                 fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, 
fetchDataInfo, logOptional.get());
@@ -1668,7 +1670,9 @@ public class RemoteLogManager implements Closeable {
 
             return fetchDataInfo;
         } finally {
-            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+            if (enrichedRecordBatch.batch != null) {
+                Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+            }
         }
     }
     // for testing
@@ -1763,15 +1767,18 @@ public class RemoteLogManager implements Closeable {
     }
 
     // Visible for testing
-    RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long 
offset) throws IOException {
-        RecordBatch nextBatch;
+    EnrichedRecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+        int skippedBytes = 0;
+        RecordBatch nextBatch = null;
         // Look for the batch which has the desired offset
         // We will always have a batch in that segment as it is a 
non-compacted topic.
         do {
+            if (nextBatch != null) {
+                skippedBytes += nextBatch.sizeInBytes();
+            }
             nextBatch = remoteLogInputStream.nextBatch();
         } while (nextBatch != null && nextBatch.lastOffset() < offset);
-
-        return nextBatch;
+        return new EnrichedRecordBatch(nextBatch, skippedBytes);
     }
 
     OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
@@ -2122,4 +2129,14 @@ public class RemoteLogManager implements Closeable {
                     '}';
         }
     }
+
+    static class EnrichedRecordBatch {
+        private final RecordBatch batch;
+        private final int skippedBytes;
+
+        public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) {
+            this.batch = batch;
+            this.skippedBytes = skippedBytes;
+        }
+    }
 }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 81d7e3f100f..24c89ce3970 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -135,6 +135,7 @@ import scala.Option;
 import scala.collection.JavaConverters;
 
 import static 
kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
+import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -150,6 +151,7 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -2932,8 +2934,8 @@ public class RemoteLogManagerTest {
             }
 
             // This is the key scenario that we are testing here
-            RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) {
-                return null;
+            EnrichedRecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) {
+                return new EnrichedRecordBatch(null, 0);
             }
         }) {
             FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
@@ -2999,10 +3001,10 @@ public class RemoteLogManagerTest {
                 return 1;
             }
 
-            RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) {
+            EnrichedRecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) {
                 
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
                 doNothing().when(firstBatch).writeTo(capture.capture());
-                return firstBatch;
+                return new EnrichedRecordBatch(firstBatch, 0);
             }
         }) {
             FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
@@ -3396,6 +3398,75 @@ public class RemoteLogManagerTest {
         assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
     }
 
+    @Test
+    public void testRemoteReadFetchDataInfo() throws RemoteStorageException, 
IOException {
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition),
 anyInt(), anyLong()))
+                .thenAnswer(ans -> {
+                    long offset = ans.getArgument(2);
+                    RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+                    RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata(segmentId,
+                            offset - 10, offset + 99, 1024, totalEpochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+                    return Optional.of(segmentMetadata);
+                });
+
+        File segmentFile = tempFile();
+        appendRecordsToFile(segmentFile, 100, 3);
+        FileInputStream fileInputStream = new FileInputStream(segmentFile);
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenReturn(fileInputStream);
+
+        RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> currentLogStartOffset.set(offset),
+                brokerTopicStats, metrics) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+            int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 0;
+            }
+        };
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
+
+        long fetchOffset = 10;
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(
+                1048576, true, leaderTopicIdPartition.topicPartition(),
+                partitionData, FetchIsolation.HIGH_WATERMARK, false);
+        FetchDataInfo fetchDataInfo = 
remoteLogManager.read(remoteStorageFetchInfo);
+        // firstBatch baseOffset may not be equal to the fetchOffset
+        assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+        assertEquals(273, 
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
+    }
+
+    private void appendRecordsToFile(File file, int nRecords, int 
nRecordsPerBatch) throws IOException {
+        byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
+        Compression compression = Compression.NONE;
+        long offset = 0;
+        List<SimpleRecord> records = new ArrayList<>();
+        try (FileRecords fileRecords = FileRecords.open(file)) {
+            for (long counter = 1; counter < nRecords + 1; counter++) {
+                records.add(new SimpleRecord("foo".getBytes()));
+                if (counter % nRecordsPerBatch == 0) {
+                    fileRecords.append(MemoryRecords.withRecords(magic, 
offset, compression, CREATE_TIME,
+                            records.toArray(new SimpleRecord[0])));
+                    offset += records.size();
+                    records.clear();
+                }
+            }
+            fileRecords.flush();
+        }
+    }
+
     private Partition mockPartition(TopicIdPartition topicIdPartition) {
         TopicPartition tp = topicIdPartition.topicPartition();
         Partition partition = mock(Partition.class);

Reply via email to