Hangleton commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +623,210 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + int firstBatchSize = firstBatch.sizeInBytes(); + // An empty record is sent instead of an incomplete batch when + // - there is no minimum-one-message constraint and + // - the first batch size is more than maximum bytes that can be sent. + // - for FetchRequest version 3 or above and + if (!remoteStorageFetchInfo.minOneMessage && + !remoteStorageFetchInfo.hardMaxBytesLimit && + firstBatchSize > maxBytes) { + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); + } + + int updatedFetchSize = + remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + + ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); + int remainingBytes = updatedFetchSize; + + firstBatch.writeTo(buffer); + remainingBytes -= firstBatchSize; + + if (remainingBytes > 0) { + // read the input stream until min of (EOF stream or buffer's remaining capacity). + Utils.readFully(remoteSegInputStream, buffer); + } + buffer.flip(); + + FetchDataInfo fetchDataInfo = new FetchDataInfo( + new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), + MemoryRecords.readableRecords(buffer)); + if (includeAbortedTxns) { + fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get()); + } + + return fetchDataInfo; + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + } + } + + private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); + } + + private FetchDataInfo addAbortedTransactions(long startOffset, Review Comment: Do we have a unit test for this method and/or the associated functionality in isolation? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +623,210 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); Review Comment: Would it be worth adding a `DEBUG` log here to help with debugging? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), - divergingEpoch = None, - highWatermark = UnifiedLog.UnknownOffset, - leaderLogStartOffset = UnifiedLog.UnknownOffset, - leaderLogEndOffset = UnifiedLog.UnknownOffset, - followerLogStartOffset = UnifiedLog.UnknownOffset, - fetchTimeMs = -1L, - lastStableOffset = None, - exception = Some(e)) + createLogReadResult(e) + case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && + // Check that the fetch offset is within the offset range within the remote storage layer. + log.logStartOffset <= offset && offset < log.localLogStartOffset()) { + // For follower fetch requests, throw an error saying that this offset is moved to tiered storage. + val highWatermark = log.highWatermark + val leaderLogStartOffset = log.logStartOffset Review Comment: Ditto. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), - divergingEpoch = None, - highWatermark = UnifiedLog.UnknownOffset, - leaderLogStartOffset = UnifiedLog.UnknownOffset, - leaderLogEndOffset = UnifiedLog.UnknownOffset, - followerLogStartOffset = UnifiedLog.UnknownOffset, - fetchTimeMs = -1L, - lastStableOffset = None, - exception = Some(e)) + createLogReadResult(e) + case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && + // Check that the fetch offset is within the offset range within the remote storage layer. + log.logStartOffset <= offset && offset < log.localLogStartOffset()) { Review Comment: The log start offset and local log start offset could be updated asynchronously and non-atomically. Could that be a problem? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -578,9 +595,15 @@ public void run() { return; try { + Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); + + if (!unifiedLogOptional.isPresent()) { + return; Review Comment: Should we log this case at `DEBUG` level? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +622,208 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + // An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint + // and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. + if (!remoteStorageFetchInfo.minOneMessage && + !remoteStorageFetchInfo.hardMaxBytesLimit && Review Comment: Tagging this comment as not longer valid since a code change addressed it. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +622,208 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + // An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint + // and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. + int firstBatchSize = firstBatch.sizeInBytes(); + if (!remoteStorageFetchInfo.minOneMessage && + !remoteStorageFetchInfo.hardMaxBytesLimit && + firstBatchSize > maxBytes) { + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); + } + + int updatedFetchSize = + remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + + ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); + int remainingBytes = updatedFetchSize; + + firstBatch.writeTo(buffer); + remainingBytes -= firstBatchSize; + + if (remainingBytes > 0) { + // read the input stream until min of (EOF stream or buffer's remaining capacity). + Utils.readFully(remoteSegInputStream, buffer); + } + buffer.flip(); + + FetchDataInfo fetchDataInfo = new FetchDataInfo( + new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), + MemoryRecords.readableRecords(buffer)); + if (includeAbortedTxns) { + fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get()); + } + + return fetchDataInfo; + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + } + } + + private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); + } + + private FetchDataInfo addAbortedTransactions(long startOffset, + RemoteLogSegmentMetadata segmentMetadata, + FetchDataInfo fetchInfo, + UnifiedLog log) throws RemoteStorageException { + int fetchSize = fetchInfo.records.sizeInBytes(); + OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + + OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex(); + long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) + .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1); + + final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>(); + + Consumer<List<AbortedTxn>> accumulator = + abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() + .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList())); + + collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); + + return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, + fetchInfo.records, + fetchInfo.firstEntryIncomplete, + Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions))); + } + + private void collectAbortedTransactions(long startOffset, + long upperBoundOffset, + RemoteLogSegmentMetadata segmentMetadata, + Consumer<List<AbortedTxn>> accumulator, + UnifiedLog log) throws RemoteStorageException { + Iterator<LogSegment> localLogSegments = JavaConverters.asJavaIterator(log.logSegments().iterator()); + + boolean searchInLocalLog = false; + Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata); + Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex()); + + while (txnIndexOpt.isPresent()) { + TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset); + accumulator.accept(searchResult.abortedTransactions); + if (!searchResult.isComplete) { + if (!searchInLocalLog) { + nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log); + + txnIndexOpt = nextSegmentMetadataOpt.map(x -> indexCache.getIndexEntry(x).txnIndex()); + if (!txnIndexOpt.isPresent()) { + searchInLocalLog = true; + } + } + + if (searchInLocalLog) { + txnIndexOpt = (localLogSegments.hasNext()) ? Optional.of(localLogSegments.next().txnIndex()) : Optional.empty(); } + } else { + return; + } + } + } + + private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, UnifiedLog log) throws RemoteStorageException { + Option<LeaderEpochFileCache> leaderEpochFileCacheOption = log.leaderEpochCache(); + if (leaderEpochFileCacheOption.isEmpty()) { + return Optional.empty(); + } + + TopicPartition topicPartition = segmentMetadata.topicIdPartition().topicPartition(); + long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1; + OptionalInt epoch = OptionalInt.of(segmentMetadata.segmentLeaderEpochs().lastEntry().getKey()); Review Comment: Wouldn't it be possible to resolve the leader epoch of the `nextSegmentBaseOffset` directly from the leader epoch cache of the partition, removing the need for the loop altogether? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +623,210 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + int firstBatchSize = firstBatch.sizeInBytes(); + // An empty record is sent instead of an incomplete batch when + // - there is no minimum-one-message constraint and + // - the first batch size is more than maximum bytes that can be sent. + // - for FetchRequest version 3 or above and + if (!remoteStorageFetchInfo.minOneMessage && + !remoteStorageFetchInfo.hardMaxBytesLimit && + firstBatchSize > maxBytes) { + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); + } + + int updatedFetchSize = + remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + + ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); + int remainingBytes = updatedFetchSize; + + firstBatch.writeTo(buffer); + remainingBytes -= firstBatchSize; + + if (remainingBytes > 0) { + // read the input stream until min of (EOF stream or buffer's remaining capacity). + Utils.readFully(remoteSegInputStream, buffer); + } + buffer.flip(); + + FetchDataInfo fetchDataInfo = new FetchDataInfo( + new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), + MemoryRecords.readableRecords(buffer)); + if (includeAbortedTxns) { + fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get()); + } + + return fetchDataInfo; + } finally { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + } + } + + private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); + } + + private FetchDataInfo addAbortedTransactions(long startOffset, + RemoteLogSegmentMetadata segmentMetadata, + FetchDataInfo fetchInfo, + UnifiedLog log) throws RemoteStorageException { + int fetchSize = fetchInfo.records.sizeInBytes(); + OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + + OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex(); + long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) + .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1); + + final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>(); + + Consumer<List<AbortedTxn>> accumulator = + abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() + .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList())); + + collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log); + + return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, + fetchInfo.records, + fetchInfo.firstEntryIncomplete, + Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions))); + } + + private void collectAbortedTransactions(long startOffset, + long upperBoundOffset, + RemoteLogSegmentMetadata segmentMetadata, + Consumer<List<AbortedTxn>> accumulator, + UnifiedLog log) throws RemoteStorageException { + Iterator<LogSegment> localLogSegments = JavaConverters.asJavaIterator(log.logSegments().iterator()); + + boolean searchInLocalLog = false; + Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata); + Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex()); + + while (txnIndexOpt.isPresent()) { + TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset); + accumulator.accept(searchResult.abortedTransactions); + if (!searchResult.isComplete) { + if (!searchInLocalLog) { Review Comment: Maybe we could separate the local and remote accumulation from within the same loop and segregate the branches for local/remote from the core logic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org