satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,25 +622,208 @@ public String toString() {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = 
epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            // An empty record is sent instead of an incomplete batch when 
there is no minimum-one-message constraint
+            // and for FetchRequest version 3 and above and the first batch 
size is more than maximum bytes that can be sent.
+            int firstBatchSize = firstBatch.sizeInBytes();
+            if (!remoteStorageFetchInfo.minOneMessage &&
+                    !remoteStorageFetchInfo.hardMaxBytesLimit &&
+                    firstBatchSize > maxBytes) {
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+            }
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatchSize;
+
+            if (remainingBytes > 0) {
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(
+                    new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+                    MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, 
fetchDataInfo, logOptional.get());
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                                 RemoteLogSegmentMetadata 
segmentMetadata,
+                                                 FetchDataInfo fetchInfo,
+                                                 UnifiedLog log) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final Set<FetchResponseData.AbortedTransaction> abortedTransactions = 
new HashSet<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                            long upperBoundOffset,
+                                            RemoteLogSegmentMetadata 
segmentMetadata,
+                                            Consumer<List<AbortedTxn>> 
accumulator,
+                                            UnifiedLog log) throws 
RemoteStorageException {
+        Iterator<LogSegment> localLogSegments = 
JavaConverters.asJavaIterator(log.logSegments().iterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log);
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> 
indexCache.getIndexEntry(x).txnIndex());
+                    if (!txnIndexOpt.isPresent()) {
+                        searchInLocalLog = true;
+                    }
+                }
+
+                if (searchInLocalLog) {
+                    txnIndexOpt = (localLogSegments.hasNext()) ? 
Optional.of(localLogSegments.next().txnIndex()) : Optional.empty();
                 }
+            } else {
+                return;
+            }
+        }
+    }
+
+    private Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, UnifiedLog 
log) throws RemoteStorageException {
+        Option<LeaderEpochFileCache> leaderEpochFileCacheOption = 
log.leaderEpochCache();
+        if (leaderEpochFileCacheOption.isEmpty()) {
+            return Optional.empty();
+        }
+
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1;
+        OptionalInt epoch = 
OptionalInt.of(segmentMetadata.segmentLeaderEpochs().lastEntry().getKey());

Review Comment:
   It initially tries to find a segment for that epoch and the offset. If that 
is not found, it gets the next available epoch by using 
`leaderEpochFileCache.nextEpoch(epoch.getAsInt())` and tries to find a segment 
for that epoch. It eventually tries to find it by going through the epoch 
sequence like below.
   
   ```
           while (!result.isPresent() && epoch.isPresent()) {
               result = fetchRemoteLogSegmentMetadata(topicPartition, 
epoch.getAsInt(), nextSegmentBaseOffset);
               epoch = leaderEpochFileCache.nextEpoch(epoch.getAsInt());
           }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to