kamalcph commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1832738150


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
                         
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
 
+        long startTimeNs = time.nanoseconds();
         collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+        LOGGER.debug("Time taken to collect: {} aborted transactions for {} in 
{} ns", abortedTransactions.size(),
+                segmentMetadata, time.nanoseconds() - startTimeNs);
 
         return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
                 fetchInfo.records,
                 fetchInfo.firstEntryIncomplete,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache leaderEpochCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            Optional<TransactionIndex> txnIndexOpt = 
getTransactionIndex(currentMetadata);
             if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+                TransactionIndex txnIndex = txnIndexOpt.get();
+                TxnIndexSearchResult searchResult = 
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
                 accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
-                }
+                isSearchComplete = searchResult.isComplete;
+            }
+            if (!isSearchComplete) {
+                currentMetadataOpt = findNextSegmentWithTxnIndex(tp, 
currentMetadata.endOffset() + 1, leaderEpochCache);
             }
-
-            nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
         }
-
         // Search in local segments
-        collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        if (!isSearchComplete) {
+            collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        }
+    }
+
+    private Optional<TransactionIndex> 
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+        return !currentMetadata.isTxnIdxEmpty() ?
+                // `ofNullable` is needed for backward compatibility for old 
events on which txnIdx may not be present.

Review Comment:
   You may be referring to the producer-snapshot file which wasn't present for 
all the segments pre-2.8 build. The `txnIndex` is still optional in v3.9 when 
there are no aborted txn entries. 
   
   The backward compatibility mentioned in this comment is for old events that 
were stored in the `__remote_log_metadata` topic. The old events will return 
the `txnIdxEmpty` as false, but the transaction index may not exist in the 
remote storage. Updated the comment to clarify it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to