divijvaidya commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1484189476


##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -421,6 +446,18 @@ private AbstractIterator<FileChannelRecordBatch> 
batchIterator(int start) {
         return new RecordBatchIterator<>(inputStream);
     }
 
+    /**
+     * Try populating OS page cache with file content
+     */
+    public void prepareForRead() throws IOException {
+        if (DEVNULL_PATH != null) {
+            long size = Math.min(channel.size(), end) - start;

Review Comment:
   Isn't `size` member in FileRecords precomputed for FileRecords during 
construction and maintained whenever there is more mutation? Hence, we can 
simply use `this.size` here. No?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java:
##########
@@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
                          Records records,
                          boolean firstEntryIncomplete,
                          Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions) {
-        this(fetchOffsetMetadata, records, firstEntryIncomplete, 
abortedTransactions, Optional.empty());
+        this(fetchOffsetMetadata,
+             records,
+             firstEntryIncomplete,
+             abortedTransactions,
+             Optional.empty(),
+             LogOffsetMetadata.UNKNOWN_OFFSET_METADATA);
     }
 
     public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
                          Records records,
                          boolean firstEntryIncomplete,
                          Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions,
-                         Optional<RemoteStorageFetchInfo> 
delayedRemoteStorageFetch) {
+                         Optional<RemoteStorageFetchInfo> 
delayedRemoteStorageFetch,
+                         LogOffsetMetadata maxOffsetMetadata) {
         this.fetchOffsetMetadata = fetchOffsetMetadata;
         this.records = records;
         this.firstEntryIncomplete = firstEntryIncomplete;
         this.abortedTransactions = abortedTransactions;
         this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
+        this.maxOffsetMetadata = maxOffsetMetadata;
+    }
+
+    public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata 
maxOffsetMetadata) {
+        return new FetchDataInfo(fetchOffsetMetadata,
+                                 records,
+                                 firstEntryIncomplete,
+                                 abortedTransactions,
+                                 delayedRemoteStorageFetch,
+                                 maxOffsetMetadata);
+    }
+
+    public boolean isLastSegment() {

Review Comment:
   last segment is always the active segment. Shall we call it 
isActiveSegment()? 
   
   Also, FetchDataInfo is not a property of a segment, hence isActiveSegment() 
sounds counter intuitive. But it also cannot have data spanning across multiple 
segments. So, perhaps rename the function to, isFetchDataFromActiveSegment()



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -421,6 +446,18 @@ private AbstractIterator<FileChannelRecordBatch> 
batchIterator(int start) {
         return new RecordBatchIterator<>(inputStream);
     }
 
+    /**
+     * Try populating OS page cache with file content
+     */
+    public void prepareForRead() throws IOException {
+        if (DEVNULL_PATH != null) {
+            long size = Math.min(channel.size(), end) - start;
+            try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, 
StandardOpenOption.WRITE)) {
+                channel.transferTo(start, size, devnullChannel);

Review Comment:
   do we want to pre-populate the entire content represented by the FileRecords 
or just the one that will be read by writeTo() method? For example, we are 
pre-populating all content represented by FileRecords here but we might end up 
reading only a part of it in writeTo() (see length param in writeTo()). Can we 
instead stick to a pre-populating a smaller chunk, perhaps 32KB which is the 
chunk size used by SSL transport layer while reading from the file.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java:
##########
@@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
                          Records records,
                          boolean firstEntryIncomplete,
                          Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions) {
-        this(fetchOffsetMetadata, records, firstEntryIncomplete, 
abortedTransactions, Optional.empty());
+        this(fetchOffsetMetadata,
+             records,
+             firstEntryIncomplete,
+             abortedTransactions,
+             Optional.empty(),
+             LogOffsetMetadata.UNKNOWN_OFFSET_METADATA);
     }
 
     public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
                          Records records,
                          boolean firstEntryIncomplete,
                          Optional<List<FetchResponseData.AbortedTransaction>> 
abortedTransactions,
-                         Optional<RemoteStorageFetchInfo> 
delayedRemoteStorageFetch) {
+                         Optional<RemoteStorageFetchInfo> 
delayedRemoteStorageFetch,
+                         LogOffsetMetadata maxOffsetMetadata) {
         this.fetchOffsetMetadata = fetchOffsetMetadata;
         this.records = records;
         this.firstEntryIncomplete = firstEntryIncomplete;
         this.abortedTransactions = abortedTransactions;
         this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
+        this.maxOffsetMetadata = maxOffsetMetadata;
+    }
+
+    public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata 
maxOffsetMetadata) {
+        return new FetchDataInfo(fetchOffsetMetadata,
+                                 records,
+                                 firstEntryIncomplete,
+                                 abortedTransactions,
+                                 delayedRemoteStorageFetch,
+                                 maxOffsetMetadata);
+    }
+
+    public boolean isLastSegment() {

Review Comment:
   Please add documentation and state and there may be cases where we don't 
know (unknown_offset_metatda) and in such cases we return false, hence, this 
function may return falseNegative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to