divijvaidya commented on code in PR #14289: URL: https://github.com/apache/kafka/pull/14289#discussion_r1484189476
########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -421,6 +446,18 @@ private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) { return new RecordBatchIterator<>(inputStream); } + /** + * Try populating OS page cache with file content + */ + public void prepareForRead() throws IOException { + if (DEVNULL_PATH != null) { + long size = Math.min(channel.size(), end) - start; Review Comment: Isn't `size` member in FileRecords precomputed for FileRecords during construction and maintained whenever there is more mutation? Hence, we can simply use `this.size` here. No? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java: ########## @@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) { - this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty()); + this(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + Optional.empty(), + LogOffsetMetadata.UNKNOWN_OFFSET_METADATA); } public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions, - Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch) { + Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch, + LogOffsetMetadata maxOffsetMetadata) { this.fetchOffsetMetadata = fetchOffsetMetadata; this.records = records; this.firstEntryIncomplete = firstEntryIncomplete; this.abortedTransactions = abortedTransactions; this.delayedRemoteStorageFetch = delayedRemoteStorageFetch; + this.maxOffsetMetadata = maxOffsetMetadata; + } + + public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata maxOffsetMetadata) { + return new FetchDataInfo(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + delayedRemoteStorageFetch, + maxOffsetMetadata); + } + + public boolean isLastSegment() { Review Comment: last segment is always the active segment. Shall we call it isActiveSegment()? Also, FetchDataInfo is not a property of a segment, hence isActiveSegment() sounds counter intuitive. But it also cannot have data spanning across multiple segments. So, perhaps rename the function to, isFetchDataFromActiveSegment() ########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -421,6 +446,18 @@ private AbstractIterator<FileChannelRecordBatch> batchIterator(int start) { return new RecordBatchIterator<>(inputStream); } + /** + * Try populating OS page cache with file content + */ + public void prepareForRead() throws IOException { + if (DEVNULL_PATH != null) { + long size = Math.min(channel.size(), end) - start; + try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, StandardOpenOption.WRITE)) { + channel.transferTo(start, size, devnullChannel); Review Comment: do we want to pre-populate the entire content represented by the FileRecords or just the one that will be read by writeTo() method? For example, we are pre-populating all content represented by FileRecords here but we might end up reading only a part of it in writeTo() (see length param in writeTo()). Can we instead stick to a pre-populating a smaller chunk, perhaps 32KB which is the chunk size used by SSL transport layer while reading from the file. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java: ########## @@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) { - this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty()); + this(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + Optional.empty(), + LogOffsetMetadata.UNKNOWN_OFFSET_METADATA); } public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions, - Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch) { + Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch, + LogOffsetMetadata maxOffsetMetadata) { this.fetchOffsetMetadata = fetchOffsetMetadata; this.records = records; this.firstEntryIncomplete = firstEntryIncomplete; this.abortedTransactions = abortedTransactions; this.delayedRemoteStorageFetch = delayedRemoteStorageFetch; + this.maxOffsetMetadata = maxOffsetMetadata; + } + + public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata maxOffsetMetadata) { + return new FetchDataInfo(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + delayedRemoteStorageFetch, + maxOffsetMetadata); + } + + public boolean isLastSegment() { Review Comment: Please add documentation and state and there may be cases where we don't know (unknown_offset_metatda) and in such cases we return false, hence, this function may return falseNegative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org