kamalcph commented on code in PR #17659: URL: https://github.com/apache/kafka/pull/17659#discussion_r1832738150
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long startOffset, abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList())); + long startTimeNs = time.nanoseconds(); collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); + LOGGER.debug("Time taken to collect: {} aborted transactions for {} in {} ns", abortedTransactions.size(), + segmentMetadata, time.nanoseconds() - startTimeNs); return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, fetchInfo.records, fetchInfo.firstEntryIncomplete, Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions))); } + /** + * Collects the aborted transaction entries from the current and subsequent segments until the upper bound offset. + * Note that the accumulated aborted transaction entries might contain duplicates as it collects the entries across + * segments. We are relying on the client to discard the duplicates. + * @param startOffset The start offset of the fetch request. + * @param upperBoundOffset The upper bound offset of the fetch request. + * @param segmentMetadata The current segment metadata. + * @param accumulator The accumulator to collect the aborted transactions. + * @param log The unified log instance. + * @throws RemoteStorageException If an error occurs while fetching the remote log segment metadata. + */ private void collectAbortedTransactions(long startOffset, long upperBoundOffset, RemoteLogSegmentMetadata segmentMetadata, Consumer<List<AbortedTxn>> accumulator, UnifiedLog log) throws RemoteStorageException { - // Search in remote segments first. - Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata); - while (nextSegmentMetadataOpt.isPresent()) { - Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex()); + TopicPartition tp = segmentMetadata.topicIdPartition().topicPartition(); + boolean isSearchComplete = false; + LeaderEpochFileCache leaderEpochCache = log.leaderEpochCache().getOrElse(null); + Optional<RemoteLogSegmentMetadata> currentMetadataOpt = Optional.of(segmentMetadata); + while (!isSearchComplete && currentMetadataOpt.isPresent()) { + RemoteLogSegmentMetadata currentMetadata = currentMetadataOpt.get(); + Optional<TransactionIndex> txnIndexOpt = getTransactionIndex(currentMetadata); if (txnIndexOpt.isPresent()) { - TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset); + TransactionIndex txnIndex = txnIndexOpt.get(); + TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset); accumulator.accept(searchResult.abortedTransactions); - if (searchResult.isComplete) { - // Return immediately when the search result is complete, it does not need to go through local log segments. - return; - } + isSearchComplete = searchResult.isComplete; + } + if (!isSearchComplete) { + currentMetadataOpt = findNextSegmentWithTxnIndex(tp, currentMetadata.endOffset() + 1, leaderEpochCache); } - - nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache()); } - // Search in local segments - collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator()); + if (!isSearchComplete) { + collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, log.logSegments().iterator()); + } + } + + private Optional<TransactionIndex> getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) { + return !currentMetadata.isTxnIdxEmpty() ? + // `ofNullable` is needed for backward compatibility for old events on which txnIdx may not be present. Review Comment: You may be referring to the producer-snapshot file which wasn't present for all the segments pre-2.8 build. The `txnIndex` is still optional in v3.9 when there are no aborted txn entries. The backward compatibility mentioned in this comment is for old events that were stored in the `__remote_log_metadata` topic. The old events will return the `txnIdxEmpty` as false, but the transaction index may not exist in the remote storage. Updated the comment to clarify it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org