This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d100ab65367fe0b3d74a9901bcaaa26049c996b0
Author: Yuxin Tan <tanyuxinw...@gmail.com>
AuthorDate: Tue Aug 22 15:34:52 2023 +0800

    [FLINK-32870][network] Tiered storage supports reading multiple small 
buffers by reading and slicing one large buffer
    
    This closes #23255
---
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../partition/hybrid/HsFileDataIndexImpl.java      |   7 +-
 .../hybrid/index/FileDataIndexRegionHelper.java    |   5 +-
 .../hybrid/index/FileRegionWriteReadUtils.java     |   9 +-
 .../hybrid/tiered/file/PartitionFileReader.java    |  79 ++++-
 .../file/ProducerMergedPartitionFileIndex.java     |  36 +-
 .../file/ProducerMergedPartitionFileReader.java    | 383 ++++++++++++++-------
 .../file/ProducerMergedPartitionFileWriter.java    |   3 +-
 .../tiered/file/SegmentPartitionFileReader.java    |  29 +-
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   |  98 ++++--
 .../tier/remote/RemoteTierConsumerAgent.java       |  17 +-
 .../partition/hybrid/HybridShuffleTestUtils.java   |   9 +-
 .../hybrid/index/FileDataIndexCacheTest.java       |   2 +-
 .../hybrid/index/FileRegionWriteReadUtilsTest.java |   9 +-
 .../hybrid/index/TestingFileDataIndexRegion.java   |  22 +-
 .../hybrid/tiered/file/DiskIOSchedulerTest.java    |   8 +-
 .../file/ProducerMergedPartitionFileIndexTest.java |   2 +-
 .../ProducerMergedPartitionFileReaderTest.java     | 132 ++++---
 .../file/SegmentPartitionFileReaderTest.java       |  25 +-
 .../tiered/file/TestingPartitionFileReader.java    |  21 +-
 20 files changed, 642 insertions(+), 256 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
index 863be5b5b44..f89994c2d44 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil {
         }
     }
 
-    static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws 
IOException {
+    public static void readByteBufferFully(FileChannel channel, ByteBuffer b) 
throws IOException {
         // the post-checked loop here gets away with one less check in the 
normal case
         do {
             if (channel.read(b) == -1) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index 4b4221225cf..25012ffa8f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
         }
 
         @Override
-        public long getRegionFileOffset() {
+        public long getRegionStartOffset() {
             return regionFileOffset;
         }
 
+        @Override
+        public long getRegionEndOffset() {
+            throw new UnsupportedOperationException("This method is not 
supported.");
+        }
+
         @Override
         public int getNumBuffers() {
             return numBuffers;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
index c0d4270e109..77638bb3156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java
@@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelper<T extends 
FileDataIndexRegionHelper.R
         int getFirstBufferIndex();
 
         /** Get the file start offset of this region. */
-        long getRegionFileOffset();
+        long getRegionStartOffset();
+
+        /** Get the file end offset of the region. */
+        long getRegionEndOffset();
 
         /** Get the number of buffers in this region. */
         int getNumBuffers();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
index 1ca689d0c2e..f306e4291b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java
@@ -59,7 +59,7 @@ public class FileRegionWriteReadUtils {
         headerBuffer.clear();
         headerBuffer.putInt(region.getFirstBufferIndex());
         headerBuffer.putInt(region.getNumBuffers());
-        headerBuffer.putLong(region.getRegionFileOffset());
+        headerBuffer.putLong(region.getRegionStartOffset());
         headerBuffer.flip();
 
         // write payload buffer.
@@ -122,7 +122,8 @@ public class FileRegionWriteReadUtils {
         regionBuffer.clear();
         regionBuffer.putInt(region.getFirstBufferIndex());
         regionBuffer.putInt(region.getNumBuffers());
-        regionBuffer.putLong(region.getRegionFileOffset());
+        regionBuffer.putLong(region.getRegionStartOffset());
+        regionBuffer.putLong(region.getRegionEndOffset());
         regionBuffer.flip();
         BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), 
regionBuffer);
     }
@@ -145,6 +146,8 @@ public class FileRegionWriteReadUtils {
         int firstBufferIndex = regionBuffer.getInt();
         int numBuffers = regionBuffer.getInt();
         long firstBufferOffset = regionBuffer.getLong();
-        return new FixedSizeRegion(firstBufferIndex, firstBufferOffset, 
numBuffers);
+        long lastBufferEndOffset = regionBuffer.getLong();
+        return new FixedSizeRegion(
+                firstBufferIndex, firstBufferOffset, lastBufferEndOffset, 
numBuffers);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java
index 0a868c9002a..25920194597 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java
@@ -21,12 +21,14 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 
 /** {@link PartitionFileReader} defines the read logic for different types of 
shuffle files. */
 public interface PartitionFileReader {
@@ -40,16 +42,25 @@ public interface PartitionFileReader {
      * @param bufferIndex the index of buffer
      * @param memorySegment the empty buffer to store the read buffer
      * @param recycler the buffer recycler
-     * @return null if there is no data otherwise a buffer.
+     * @param readProgress the current read progress. The progress comes from 
the previous
+     *     ReadBufferResult. Note that the read progress should be implemented 
and provided by
+     *     Flink, and it should be directly tied to the file format. The field 
can be null if the
+     *     current file reader has no the read progress
+     * @param partialBuffer the previous partial buffer. The partial buffer is 
not null only when
+     *     the last read has a partial buffer, it will construct a full buffer 
during the read
+     *     process.
+     * @return null if there is no data otherwise return a read buffer result.
      */
     @Nullable
-    Buffer readBuffer(
+    ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException;
 
     /**
@@ -68,14 +79,74 @@ public interface PartitionFileReader {
      * @param subpartitionId the subpartition id of the buffer
      * @param segmentId the segment id of the buffer
      * @param bufferIndex the index of buffer
+     * @param readProgress the current read progress. The progress comes from 
the previous
+     *     ReadBufferResult. Note that the read progress should be implemented 
and provided by
+     *     Flink, and it should be directly tied to the file format. The field 
can be null if the
+     *     current file reader has no the read progress
      * @return the priority of the {@link PartitionFileReader}.
      */
     long getPriority(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex);
+            int bufferIndex,
+            @Nullable ReadProgress readProgress);
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /**
+     * This {@link ReadProgress} defines the read progress of the {@link 
PartitionFileReader}.
+     *
+     * <p>Note that the implementation of the interface should strongly bind 
with the implementation
+     * of {@link PartitionFileReader}.
+     */
+    interface ReadProgress {}
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller may continue reading the 
following buffers. Note
+         * that this hint is merely a recommendation and not obligatory. 
Following the hint while
+         * reading buffers may improve performance.
+         */
+        private final boolean continuousReadSuggested;
+
+        /**
+         * The read progress state.
+         *
+         * <p>Note that the field can be null if the current file reader has 
no the read progress
+         * state when reading buffers.
+         */
+        @Nullable private final ReadProgress readProgress;
+
+        public ReadBufferResult(
+                List<Buffer> readBuffers,
+                boolean continuousReadSuggested,
+                @Nullable ReadProgress readProgress) {
+            this.readBuffers = readBuffers;
+            this.continuousReadSuggested = continuousReadSuggested;
+            this.readProgress = readProgress;
+        }
+
+        public List<Buffer> getReadBuffers() {
+            return readBuffers;
+        }
+
+        public boolean continuousReadSuggested() {
+            return continuousReadSuggested;
+        }
+
+        @Nullable
+        public ReadProgress getReadProgress() {
+            return readProgress;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
index 01cc9086d56..d6a08da5fb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java
@@ -173,6 +173,8 @@ public class ProducerMergedPartitionFileIndex {
                         new FixedSizeRegion(
                                 firstBufferInRegion.getBufferIndex(),
                                 firstBufferInRegion.getFileOffset(),
+                                lastBufferInRegion.getFileOffset()
+                                        + 
lastBufferInRegion.getBufferSizeBytes(),
                                 lastBufferInRegion.getBufferIndex()
                                         - firstBufferInRegion.getBufferIndex()
                                         + 1));
@@ -193,10 +195,13 @@ public class ProducerMergedPartitionFileIndex {
         /** The file offset that the buffer begin with. */
         private final long fileOffset;
 
-        FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset) {
+        private final long bufferSizeBytes;
+
+        FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset, 
long bufferSizeBytes) {
             this.subpartitionId = subpartitionId;
             this.bufferIndex = bufferIndex;
             this.fileOffset = fileOffset;
+            this.bufferSizeBytes = bufferSizeBytes;
         }
 
         int getSubpartitionId() {
@@ -210,6 +215,10 @@ public class ProducerMergedPartitionFileIndex {
         long getFileOffset() {
             return fileOffset;
         }
+
+        long getBufferSizeBytes() {
+            return bufferSizeBytes;
+        }
     }
 
     /**
@@ -257,20 +266,28 @@ public class ProducerMergedPartitionFileIndex {
      */
     public static class FixedSizeRegion implements 
FileDataIndexRegionHelper.Region {
 
-        public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
+        public static final int REGION_SIZE =
+                Integer.BYTES + Long.BYTES + Integer.BYTES + Long.BYTES;
 
         /** The buffer index of first buffer. */
         private final int firstBufferIndex;
 
         /** The file offset of the region. */
-        private final long regionFileOffset;
+        private final long regionStartOffset;
+
+        private final long regionEndOffset;
 
         /** The number of buffers that the region contains. */
         private final int numBuffers;
 
-        public FixedSizeRegion(int firstBufferIndex, long regionFileOffset, 
int numBuffers) {
+        public FixedSizeRegion(
+                int firstBufferIndex,
+                long regionStartOffset,
+                long regionEndOffset,
+                int numBuffers) {
             this.firstBufferIndex = firstBufferIndex;
-            this.regionFileOffset = regionFileOffset;
+            this.regionStartOffset = regionStartOffset;
+            this.regionEndOffset = regionEndOffset;
             this.numBuffers = numBuffers;
         }
 
@@ -286,8 +303,13 @@ public class ProducerMergedPartitionFileIndex {
         }
 
         @Override
-        public long getRegionFileOffset() {
-            return regionFileOffset;
+        public long getRegionStartOffset() {
+            return regionStartOffset;
+        }
+
+        @Override
+        public long getRegionEndOffset() {
+            return regionEndOffset;
         }
 
         @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
index 00b8bd49db4..702a67213cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java
@@ -22,25 +22,34 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferHeader;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
-import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The implementation of {@link PartitionFileReader} with producer-merge mode. 
In this mode, the
@@ -51,25 +60,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ProducerMergedPartitionFileReader implements PartitionFileReader {
 
-    /**
-     * Max number of caches.
-     *
-     * <p>The constant defines the maximum number of caches that can be 
created. Its value is set to
-     * 10000, which is considered sufficient for most parallel jobs. Each 
cache only contains
-     * references and numerical variables and occupies a minimal amount of 
memory so the value is
-     * not excessively large.
-     */
-    private static final int DEFAULT_MAX_CACHE_NUM = 10000;
-
-    /**
-     * Buffer offset caches stored in map.
-     *
-     * <p>The key is the combination of {@link TieredStorageSubpartitionId} 
and buffer index. The
-     * value is the buffer offset cache, which includes file offset of the 
buffer index, the region
-     * containing the buffer index and next buffer index to consume.
-     */
-    private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, 
BufferOffsetCache>
-            bufferOffsetCaches;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class);
 
     private final ByteBuffer reusedHeaderBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
 
@@ -77,62 +69,63 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
 
     private final ProducerMergedPartitionFileIndex dataIndex;
 
-    private final int maxCacheNumber;
-
     private volatile FileChannel fileChannel;
 
-    /** The current number of caches. */
-    private int numCaches;
-
-    ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
-        this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM);
-    }
-
     @VisibleForTesting
     ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int 
maxCacheNumber) {
+            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
         this.dataFilePath = dataFilePath;
         this.dataIndex = dataIndex;
-        this.bufferOffsetCaches = new HashMap<>();
-        this.maxCacheNumber = maxCacheNumber;
     }
 
     @Override
-    public Buffer readBuffer(
+    public ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
-        if (!cache.isPresent()) {
+
+        // Get the read offset, including the start offset, the end offset
+        Tuple2<Long, Long> startAndEndOffset =
+                getReadStartAndEndOffset(subpartitionId, bufferIndex, 
readProgress, partialBuffer);
+        if (startAndEndOffset == null) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
-            }
+        long readStartOffset = startAndEndOffset.f0;
+        long readEndOffset = startAndEndOffset.f1;
+
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+
+        if (numBytesToRead == 0) {
+            return null;
         }
-        return buffer;
+
+        List<Buffer> readBuffers = new LinkedList<>();
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        // Slice the read memory segment to multiple small network buffers and 
add them to
+        // readBuffers
+        Tuple2<Integer, Integer> partial =
+                sliceBuffer(byteBuffer, memorySegment, partialBuffer, 
recycler, readBuffers);
+
+        return getReadBufferResult(
+                readBuffers,
+                readStartOffset,
+                readEndOffset,
+                numBytesToRead,
+                partial.f0,
+                partial.f1);
     }
 
     @Override
@@ -140,12 +133,18 @@ public class ProducerMergedPartitionFileReader implements 
PartitionFileReader {
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex) {
+            int bufferIndex,
+            @Nullable ReadProgress readProgress) {
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        return tryGetCache(cacheKey, false)
-                .map(BufferOffsetCache::getFileOffset)
+
+        ProducerMergedReadProgress progress = 
convertToCurrentReadProgress(readProgress);
+        if (progress != null
+                && progress.getCurrentBufferOffset() != 
progress.getEndOfRegionOffset()) {
+            return progress.getCurrentBufferOffset();
+        }
+        return dataIndex
+                .getRegion(subpartitionId, bufferIndex)
+                
.map(ProducerMergedPartitionFileIndex.FixedSizeRegion::getRegionStartOffset)
                 .orElse(Long.MAX_VALUE);
     }
 
@@ -176,91 +175,217 @@ public class ProducerMergedPartitionFileReader 
implements PartitionFileReader {
     }
 
     /**
-     * Try to get the cache according to the key.
+     * Slice the read memory segment to multiple small network buffers.
      *
-     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
-     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
-     * data index and returned.
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
      *
-     * @param cacheKey the key of cache.
-     * @param removeKey boolean decides whether to remove key.
-     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
-     *     Optional#empty()}.
+     * @param byteBuffer the byte buffer to be sliced, it points to the 
underlying memorySegment
+     * @param memorySegment the underlying memory segment to be sliced
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the number of total sliced bytes, the second 
field is the bytes of
+     *     the partial buffer
      */
-    private Optional<BufferOffsetCache> tryGetCache(
-            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
-        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
-        if (bufferOffsetCache == null) {
-            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
-                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
-            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
-        } else {
-            if (removeKey) {
-                numCaches--;
-            } else {
-                bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+    private Tuple2<Integer, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            MemorySegment memorySegment,
+            @Nullable CompositeBuffer partialBuffer,
+            BufferRecycler bufferRecycler,
+            List<Buffer> readBuffers) {
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(partialBuffer == null || partialBuffer.missingLength() > 0);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, 
bufferRecycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        try {
+            int numSlicedBytes = 0;
+            if (partialBuffer != null) {
+                // If there is a previous small partial buffer, the current 
read operation should
+                // read additional data and combine it with the existing 
partial to construct a new
+                // complete buffer
+                buffer.retainBuffer();
+                int position = byteBuffer.position() + 
partialBuffer.missingLength();
+                int numPartialBytes = partialBuffer.missingLength();
+                partialBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                numSlicedBytes += numPartialBytes;
+                byteBuffer.position(position);
+                readBuffers.add(partialBuffer);
+            }
+
+            partialBuffer = null;
+            while (byteBuffer.hasRemaining()) {
+                // Parse the small buffer's header
+                BufferHeader header = parseBufferHeader(byteBuffer);
+                if (header == null) {
+                    // If the remaining data length in the buffer is not 
enough to construct a new
+                    // complete buffer header, drop it directly.
+                    break;
+                } else {
+                    numSlicedBytes += HEADER_LENGTH;
+                }
+
+                if (header.getLength() <= byteBuffer.remaining()) {
+                    // The remaining data length in the buffer is enough to 
generate a new small
+                    // sliced network buffer. The small sliced buffer is not a 
partial buffer, we
+                    // should read the slice of the buffer directly
+                    buffer.retainBuffer();
+                    ReadOnlySlicedNetworkBuffer slicedBuffer =
+                            buffer.readOnlySlice(byteBuffer.position(), 
header.getLength());
+                    slicedBuffer.setDataType(header.getDataType());
+                    slicedBuffer.setCompressed(header.isCompressed());
+                    byteBuffer.position(byteBuffer.position() + 
header.getLength());
+                    numSlicedBytes += header.getLength();
+                    readBuffers.add(slicedBuffer);
+                } else {
+                    // The remaining data length in the buffer is smaller than 
the actual length of
+                    // the buffer, so we should generate a new partial buffer, 
allowing for
+                    // generating a new complete buffer during the next read 
operation
+                    buffer.retainBuffer();
+                    int numPartialBytes = byteBuffer.remaining();
+                    numSlicedBytes += numPartialBytes;
+                    partialBuffer = new CompositeBuffer(header);
+                    partialBuffer.addPartialBuffer(
+                            buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                    readBuffers.add(partialBuffer);
+                    break;
+                }
             }
-            return Optional.of(bufferOffsetCache);
+            return Tuple2.of(numSlicedBytes, 
getPartialBufferReadBytes(partialBuffer));
+        } catch (Throwable throwable) {
+            LOG.error("Failed to slice the read buffer {}.", byteBuffer, 
throwable);
+            throw throwable;
+        } finally {
+            buffer.recycleBuffer();
         }
     }
 
     /**
-     * The {@link BufferOffsetCache} represents the file offset cache for a 
buffer index. Each cache
-     * includes file offset of the buffer index, the region containing the 
buffer index and next
-     * buffer index to consume.
+     * Return a tuple of the start and end file offset, or return null if the 
buffer is not found in
+     * the data index.
      */
-    private class BufferOffsetCache {
+    @Nullable
+    private Tuple2<Long, Long> getReadStartAndEndOffset(
+            TieredStorageSubpartitionId subpartitionId,
+            int bufferIndex,
+            @Nullable ReadProgress currentReadProgress,
+            @Nullable CompositeBuffer partialBuffer) {
+        ProducerMergedReadProgress readProgress = 
convertToCurrentReadProgress(currentReadProgress);
+        long readStartOffset;
+        long readEndOffset;
+        if (readProgress == null
+                || readProgress.getCurrentBufferOffset() == 
readProgress.getEndOfRegionOffset()) {
+            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
+                    dataIndex.getRegion(subpartitionId, bufferIndex);
+            if (!regionOpt.isPresent()) {
+                return null;
+            }
+            readStartOffset = regionOpt.get().getRegionStartOffset();
+            readEndOffset = regionOpt.get().getRegionEndOffset();
+        } else {
+            readStartOffset =
+                    readProgress.getCurrentBufferOffset()
+                            + getPartialBufferReadBytes(partialBuffer);
+            readEndOffset = readProgress.getEndOfRegionOffset();
+        }
 
-        private final ProducerMergedPartitionFileIndex.FixedSizeRegion region;
+        checkState(readStartOffset <= readEndOffset);
+        return Tuple2.of(readStartOffset, readEndOffset);
+    }
 
-        private long fileOffset;
+    private static ReadBufferResult getReadBufferResult(
+            List<Buffer> readBuffers,
+            long readStartOffset,
+            long readEndOffset,
+            int numBytesToRead,
+            int numBytesRealRead,
+            int numBytesReadPartialBuffer) {
+        boolean shouldContinueRead = readStartOffset + numBytesRealRead < 
readEndOffset;
+        ProducerMergedReadProgress readProgress =
+                new ProducerMergedReadProgress(
+                        readStartOffset + numBytesRealRead - 
numBytesReadPartialBuffer,
+                        readEndOffset);
+        checkState(
+                numBytesRealRead <= numBytesToRead
+                        && numBytesToRead - numBytesRealRead < HEADER_LENGTH);
 
-        private int nextBufferIndex;
+        return new ReadBufferResult(readBuffers, shouldContinueRead, 
readProgress);
+    }
 
-        private BufferOffsetCache(
-                int bufferIndex, 
ProducerMergedPartitionFileIndex.FixedSizeRegion region) {
-            this.nextBufferIndex = bufferIndex;
-            this.region = region;
-            moveFileOffsetToBuffer(bufferIndex);
+    private void readFileDataToBuffer(
+            MemorySegment memorySegment, BufferRecycler recycler, ByteBuffer 
byteBuffer)
+            throws IOException {
+        try {
+            BufferReaderWriterUtil.readByteBufferFully(fileChannel, 
byteBuffer);
+            byteBuffer.flip();
+        } catch (Throwable throwable) {
+            recycler.recycle(memorySegment);
+            throw throwable;
         }
+    }
 
-        /**
-         * Get the file offset.
-         *
-         * @return the file offset.
-         */
-        private long getFileOffset() {
-            return fileOffset;
+    private static int getPartialBufferReadBytes(@Nullable CompositeBuffer 
partialBuffer) {
+        return partialBuffer == null ? 0 : partialBuffer.readableBytes() + 
HEADER_LENGTH;
+    }
+
+    private static ProducerMergedReadProgress convertToCurrentReadProgress(
+            @Nullable ReadProgress readProgress) {
+        if (readProgress == null) {
+            return null;
         }
+        checkState(readProgress instanceof ProducerMergedReadProgress);
+        return (ProducerMergedReadProgress) readProgress;
+    }
 
-        /**
-         * Updates the {@link BufferOffsetCache} upon the retrieval of a 
buffer from the file using
-         * the file offset in the {@link BufferOffsetCache}.
-         *
-         * @param bufferSize denotes the size of the buffer.
-         * @return return true if there are remaining buffers in the region, 
otherwise return false.
-         */
-        private boolean advance(long bufferSize) {
-            nextBufferIndex++;
-            fileOffset += bufferSize;
-            return nextBufferIndex < (region.getFirstBufferIndex() + 
region.getNumBuffers());
+    private BufferHeader parseBufferHeader(ByteBuffer buffer) {
+        checkArgument(reusedHeaderBuffer.position() == 0);
+
+        BufferHeader header = null;
+        try {
+            if (buffer.remaining() >= HEADER_LENGTH) {
+                // The remaining data length in the buffer is enough to 
construct a new complete
+                // buffer, parse and create a new buffer header
+                header = BufferReaderWriterUtil.parseBufferHeader(buffer);
+            }
+            // If the remaining data length in the buffer is smaller than the 
header. Drop it
+            // directly
+        } catch (Throwable throwable) {
+            reusedHeaderBuffer.clear();
+            LOG.error("Failed to parse buffer header.", throwable);
+            throw throwable;
         }
+        reusedHeaderBuffer.clear();
+        return header;
+    }
 
+    /**
+     * The implementation of {@link PartitionFileReader.ReadProgress} mainly 
includes current
+     * reading offset, end of read offset, etc.
+     */
+    public static class ProducerMergedReadProgress implements 
PartitionFileReader.ReadProgress {
         /**
-         * Relocates the file channel offset to the position of the specified 
buffer index.
-         *
-         * @param bufferIndex denotes the index of the buffer.
+         * The current reading buffer file offset. Note the offset does not 
contain the length of
+         * the partial buffer, because the partial buffer may be dropped at 
anytime.
          */
-        private void moveFileOffsetToBuffer(int bufferIndex) {
-            try {
-                
checkNotNull(fileChannel).position(region.getRegionFileOffset());
-                for (int i = 0; i < (bufferIndex - 
region.getFirstBufferIndex()); ++i) {
-                    positionToNextBuffer(fileChannel, reusedHeaderBuffer);
-                }
-                fileOffset = fileChannel.position();
-            } catch (IOException e) {
-                ExceptionUtils.rethrow(e, "Failed to move file offset");
-            }
+        private final long currentBufferOffset;
+
+        /** The end of region file offset. */
+        private final long endOfRegionOffset;
+
+        public ProducerMergedReadProgress(long currentBufferOffset, long 
endOfRegionOffset) {
+            this.currentBufferOffset = currentBufferOffset;
+            this.endOfRegionOffset = endOfRegionOffset;
+        }
+
+        public long getCurrentBufferOffset() {
+            return currentBufferOffset;
+        }
+
+        public long getEndOfRegionOffset() {
+            return endOfRegionOffset;
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java
index e30def7fcd0..57ce19eb2c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java
@@ -156,7 +156,8 @@ public class ProducerMergedPartitionFileWriter implements 
PartitionFileWriter {
                             new ProducerMergedPartitionFileIndex.FlushedBuffer(
                                     subpartitionId,
                                     bufferWithIndex.f1,
-                                    totalBytesWritten + expectedBytes));
+                                    totalBytesWritten + expectedBytes,
+                                    buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH));
                     expectedBytes += buffer.readableBytes() + 
BufferReaderWriterUtil.HEADER_LENGTH;
                 }
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java
index 9f79a78cbd8..8c07c505949 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java
@@ -25,17 +25,21 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferHeader;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import org.apache.flink.util.ExceptionUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -74,13 +78,15 @@ public class SegmentPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException {
 
         // Get the channel of the segment file for a subpartition.
@@ -109,7 +115,8 @@ public class SegmentPartitionFileReader implements 
PartitionFileReader {
         if (bufferHeaderResult == -1) {
             channel.close();
             openedChannelAndSegmentIds.get(partitionId).remove(subpartitionId);
-            return new NetworkBuffer(memorySegment, recycler, 
Buffer.DataType.END_OF_SEGMENT);
+            return getSingletonReadResult(
+                    new NetworkBuffer(memorySegment, recycler, 
Buffer.DataType.END_OF_SEGMENT));
         }
         reusedHeaderBuffer.flip();
         BufferHeader header = parseBufferHeader(reusedHeaderBuffer);
@@ -119,8 +126,13 @@ public class SegmentPartitionFileReader implements 
PartitionFileReader {
             throw new IOException("The length of data buffer is illegal.");
         }
         Buffer.DataType dataType = header.getDataType();
-        return new NetworkBuffer(
-                memorySegment, recycler, dataType, header.isCompressed(), 
header.getLength());
+        return getSingletonReadResult(
+                new NetworkBuffer(
+                        memorySegment,
+                        recycler,
+                        dataType,
+                        header.isCompressed(),
+                        header.getLength()));
     }
 
     @Override
@@ -128,7 +140,8 @@ public class SegmentPartitionFileReader implements 
PartitionFileReader {
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex) {
+            int bufferIndex,
+            @Nullable ReadProgress readProgress) {
         // noop
         return -1;
     }
@@ -166,4 +179,8 @@ public class SegmentPartitionFileReader implements 
PartitionFileReader {
                             }
                         });
     }
+
+    private static ReadBufferResult getSingletonReadResult(NetworkBuffer 
buffer) {
+        return new ReadBufferResult(Collections.singletonList(buffer), false, 
null);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
index 5e8b405c531..bceb5e55c0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
@@ -35,6 +36,7 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
@@ -338,6 +340,8 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
 
         private boolean isFailed;
 
+        @Nullable private PartitionFileReader.ReadProgress readProgress;
+
         private ScheduledSubpartitionReader(
                 TieredStorageSubpartitionId subpartitionId,
                 NettyConnectionWriter nettyConnectionWriter) {
@@ -354,34 +358,46 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
                                 + subpartitionId
                                 + " has already been failed.");
             }
-            while (!buffers.isEmpty()
-                    && nettyConnectionWriter.numQueuedBufferPayloads() < 
maxBufferReadAhead
-                    && nextSegmentId >= 0) {
-                MemorySegment memorySegment = buffers.poll();
-                Buffer buffer;
-                try {
-                    if ((buffer =
-                                    partitionFileReader.readBuffer(
-                                            partitionId,
-                                            subpartitionId,
-                                            nextSegmentId,
-                                            nextBufferIndex,
-                                            memorySegment,
-                                            recycler))
-                            == null) {
+
+            CompositeBuffer partialBuffer = null;
+            boolean shouldContinueRead = true;
+            try {
+                while (!buffers.isEmpty() && shouldContinueRead && 
nextSegmentId >= 0) {
+                    MemorySegment memorySegment = buffers.poll();
+                    PartitionFileReader.ReadBufferResult readBufferResult;
+                    try {
+                        readBufferResult =
+                                partitionFileReader.readBuffer(
+                                        partitionId,
+                                        subpartitionId,
+                                        nextSegmentId,
+                                        nextBufferIndex,
+                                        memorySegment,
+                                        recycler,
+                                        readProgress,
+                                        partialBuffer);
+                        if (readBufferResult == null) {
+                            buffers.add(memorySegment);
+                            break;
+                        }
+                    } catch (Throwable throwable) {
+                        buffers.add(memorySegment);
+                        throw throwable;
+                    }
+
+                    List<Buffer> readBuffers = 
readBufferResult.getReadBuffers();
+                    shouldContinueRead = 
readBufferResult.continuousReadSuggested();
+                    readProgress = readBufferResult.getReadProgress();
+                    if (readBuffers.isEmpty()) {
                         buffers.add(memorySegment);
                         break;
                     }
-                } catch (Throwable throwable) {
-                    buffers.add(memorySegment);
-                    throw throwable;
+
+                    partialBuffer = 
writeFullBuffersAndGetPartialBuffer(readBuffers);
                 }
-                writeToNettyConnectionWriter(
-                        NettyPayload.newBuffer(
-                                buffer, nextBufferIndex++, 
subpartitionId.getSubpartitionId()));
-                if (buffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
-                    nextSegmentId = -1;
-                    updateSegmentId();
+            } finally {
+                if (partialBuffer != null) {
+                    partialBuffer.recycleBuffer();
                 }
             }
         }
@@ -400,7 +416,39 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
                     nextSegmentId < 0
                             ? Long.MAX_VALUE
                             : partitionFileReader.getPriority(
-                                    partitionId, subpartitionId, 
nextSegmentId, nextBufferIndex);
+                                    partitionId,
+                                    subpartitionId,
+                                    nextSegmentId,
+                                    nextBufferIndex,
+                                    readProgress);
+        }
+
+        private CompositeBuffer 
writeFullBuffersAndGetPartialBuffer(List<Buffer> readBuffers) {
+            CompositeBuffer partialBuffer = null;
+            for (int i = 0; i < readBuffers.size(); i++) {
+                Buffer readBuffer = readBuffers.get(i);
+                if (i == readBuffers.size() - 1 && 
isPartialBuffer(readBuffer)) {
+                    partialBuffer = (CompositeBuffer) readBuffer;
+                    continue;
+                }
+                writeNettyBufferAndUpdateSegmentId(readBuffer);
+            }
+            return partialBuffer;
+        }
+
+        private boolean isPartialBuffer(Buffer readBuffer) {
+            return readBuffer instanceof CompositeBuffer
+                    && ((CompositeBuffer) readBuffer).missingLength() > 0;
+        }
+
+        private void writeNettyBufferAndUpdateSegmentId(Buffer readBuffer) {
+            writeToNettyConnectionWriter(
+                    NettyPayload.newBuffer(
+                            readBuffer, nextBufferIndex++, 
subpartitionId.getSubpartitionId()));
+            if (readBuffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
+                nextSegmentId = -1;
+                updateSegmentId();
+            }
         }
 
         private void writeToNettyConnectionWriter(NettyPayload nettyPayload) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
index a43d3bd304b..12d046e8325 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
@@ -32,9 +32,13 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** The data client is used to fetch data from remote tier. */
 public class RemoteTierConsumerAgent implements TierConsumerAgent {
 
@@ -87,21 +91,26 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent {
 
         // Read buffer from the partition file in remote storage.
         MemorySegment memorySegment = 
MemorySegmentFactory.allocateUnpooledSegment(bufferSizeBytes);
-        Buffer buffer = null;
+        PartitionFileReader.ReadBufferResult readBufferResult = null;
         try {
-            buffer =
+            readBufferResult =
                     partitionFileReader.readBuffer(
                             partitionId,
                             subpartitionId,
                             segmentId,
                             currentBufferIndex,
                             memorySegment,
-                            FreeingBufferRecycler.INSTANCE);
+                            FreeingBufferRecycler.INSTANCE,
+                            null,
+                            null);
         } catch (IOException e) {
             memorySegment.free();
             ExceptionUtils.rethrow(e, "Failed to read buffer from partition 
file.");
         }
-        if (buffer != null) {
+        List<Buffer> readBuffers = 
checkNotNull(readBufferResult).getReadBuffers();
+        if (!readBuffers.isEmpty()) {
+            checkState(readBuffers.size() == 1);
+            Buffer buffer = readBuffers.get(0);
             currentBufferIndexAndSegmentIds
                     .get(partitionId)
                     .put(subpartitionId, Tuple2.of(++currentBufferIndex, 
segmentId));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index d9abab0b139..24d81230b30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -119,15 +119,18 @@ public class HybridShuffleTestUtils {
     }
 
     public static FileDataIndexRegionHelper.Region createSingleFixedSizeRegion(
-            int firstBufferIndex, long firstBufferOffset, int 
numBuffersPerRegion) {
+            int firstBufferIndex,
+            long firstBufferOffset,
+            long lastBufferEndOffset,
+            int numBuffersPerRegion) {
         return new ProducerMergedPartitionFileIndex.FixedSizeRegion(
-                firstBufferIndex, firstBufferOffset, numBuffersPerRegion);
+                firstBufferIndex, firstBufferOffset, lastBufferEndOffset, 
numBuffersPerRegion);
     }
 
     public static void assertRegionEquals(
             FileDataIndexRegionHelper.Region expected, 
FileDataIndexRegionHelper.Region region) {
         
assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex());
-        
assertThat(region.getRegionFileOffset()).isEqualTo(expected.getRegionFileOffset());
+        
assertThat(region.getRegionStartOffset()).isEqualTo(expected.getRegionStartOffset());
         assertThat(region.getNumBuffers()).isEqualTo(expected.getNumBuffers());
         if (expected instanceof InternalRegion) {
             assertThat(region).isInstanceOf(InternalRegion.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
index 1ffd3620975..76ef63838af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java
@@ -68,7 +68,7 @@ class FileDataIndexCacheTest {
                 .hasValueSatisfying(
                         (region) -> {
                             
assertThat(region.getFirstBufferIndex()).isEqualTo(0);
-                            
assertThat(region.getRegionFileOffset()).isEqualTo(0);
+                            
assertThat(region.getRegionStartOffset()).isEqualTo(0);
                             assertThat(region.getNumBuffers()).isEqualTo(3);
                         });
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
index 8e04f24186e..4237aeff1db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java
@@ -36,6 +36,7 @@ import java.util.UUID;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleFixedSizeRegion;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.FixedSizeRegion.REGION_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -83,9 +84,9 @@ class FileRegionWriteReadUtilsTest {
     @Test
     void testReadPrematureEndOfFileForFixedSizeRegion(@TempDir Path tmpPath) 
throws Exception {
         FileChannel channel = tmpFileChannel(tmpPath);
-        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(REGION_SIZE);
         FileRegionWriteReadUtils.writeFixedSizeRegionToFile(
-                channel, buffer, createSingleFixedSizeRegion(0, 0L, 1));
+                channel, buffer, createSingleFixedSizeRegion(0, 0L, 10L, 1));
         channel.truncate(channel.position() - 1);
         buffer.flip();
         assertThatThrownBy(
@@ -98,8 +99,8 @@ class FileRegionWriteReadUtilsTest {
     @Test
     void testWriteAndReadFixedSizeRegion(@TempDir Path tmpPath) throws 
Exception {
         FileChannel channel = tmpFileChannel(tmpPath);
-        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
-        FileDataIndexRegionHelper.Region region = 
createSingleFixedSizeRegion(10, 100L, 1);
+        ByteBuffer buffer = 
FileRegionWriteReadUtils.allocateAndConfigureBuffer(REGION_SIZE);
+        FileDataIndexRegionHelper.Region region = 
createSingleFixedSizeRegion(10, 100L, 110L, 1);
         FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, buffer, 
region);
         buffer.flip();
         ProducerMergedPartitionFileIndex.FixedSizeRegion readRegion =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
index 29ee310fba8..3fd42272a38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java
@@ -39,6 +39,8 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
 
     private final Supplier<Long> getRegionFileOffsetSupplier;
 
+    private final Supplier<Long> getRegionFileEndOffsetSupplier;
+
     private final Supplier<Integer> getNumBuffersSupplier;
 
     private final Function<Integer, Boolean> containBufferFunction;
@@ -47,11 +49,13 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
             Supplier<Integer> getSizeSupplier,
             Supplier<Integer> getFirstBufferIndexSupplier,
             Supplier<Long> getRegionFileOffsetSupplier,
+            Supplier<Long> getRegionFileEndOffsetSupplier,
             Supplier<Integer> getNumBuffersSupplier,
             Function<Integer, Boolean> containBufferFunction) {
         this.getSizeSupplier = getSizeSupplier;
         this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier;
         this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier;
+        this.getRegionFileEndOffsetSupplier = getRegionFileEndOffsetSupplier;
         this.getNumBuffersSupplier = getNumBuffersSupplier;
         this.containBufferFunction = containBufferFunction;
     }
@@ -67,10 +71,15 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
     }
 
     @Override
-    public long getRegionFileOffset() {
+    public long getRegionStartOffset() {
         return getRegionFileOffsetSupplier.get();
     }
 
+    @Override
+    public long getRegionEndOffset() {
+        return getRegionFileEndOffsetSupplier.get();
+    }
+
     @Override
     public int getNumBuffers() {
         return getNumBuffersSupplier.get();
@@ -87,7 +96,7 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
         regionBuffer.clear();
         regionBuffer.putInt(region.getFirstBufferIndex());
         regionBuffer.putInt(region.getNumBuffers());
-        regionBuffer.putLong(region.getRegionFileOffset());
+        regionBuffer.putLong(region.getRegionStartOffset());
         regionBuffer.flip();
         BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), 
regionBuffer);
     }
@@ -130,6 +139,8 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
 
         private Supplier<Long> getRegionFileOffsetSupplier = () -> 0L;
 
+        private Supplier<Long> getRegionFileEndOffsetSupplier = () -> 0L;
+
         private Supplier<Integer> getNumBuffersSupplier = () -> 0;
 
         private Function<Integer, Boolean> containBufferFunction = bufferIndex 
-> false;
@@ -152,6 +163,12 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
             return this;
         }
 
+        public TestingFileDataIndexRegion.Builder 
setGetRegionFileEndOffsetSupplier(
+                Supplier<Long> getRegionFileEndOffsetSupplier) {
+            this.getRegionFileEndOffsetSupplier = 
getRegionFileEndOffsetSupplier;
+            return this;
+        }
+
         public TestingFileDataIndexRegion.Builder setGetNumBuffersSupplier(
                 Supplier<Integer> getNumBuffersSupplier) {
             this.getNumBuffersSupplier = getNumBuffersSupplier;
@@ -169,6 +186,7 @@ public class TestingFileDataIndexRegion implements 
FileDataIndexRegionHelper.Reg
                     getSizeSupplier,
                     getFirstBufferIndexSupplier,
                     getRegionFileOffsetSupplier,
+                    getRegionFileEndOffsetSupplier,
                     getNumBuffersSupplier,
                     containBufferFunction);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
index 98bccb530f3..8d453d00f2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -97,7 +98,12 @@ class DiskIOSchedulerTest {
                                 .setReadBufferSupplier(
                                         (bufferIndex, segmentId) -> {
                                             
segmentIdFuture.complete(segmentId);
-                                            return 
BufferBuilderTestUtils.buildSomeBuffer(0);
+                                            return new 
PartitionFileReader.ReadBufferResult(
+                                                    Collections.singletonList(
+                                                            
BufferBuilderTestUtils.buildSomeBuffer(
+                                                                    0)),
+                                                    true,
+                                                    null);
                                         })
                                 .setReleaseNotifier(() -> 
readerReleaseFuture.complete(null))
                                 .setPrioritySupplier(subpartitionId -> (long) 
subpartitionId)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
index 615e2235643..a382511b53a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java
@@ -81,7 +81,7 @@ class ProducerMergedPartitionFileIndexTest {
                 boolean isNextRegionContinuous =
                         (j == 0 || random.nextBoolean()) && j != 
numBuffersPerSubpartition - 1;
                 flushedBuffers.add(
-                        new ProducerMergedPartitionFileIndex.FlushedBuffer(i, 
bufferIndex, 0));
+                        new ProducerMergedPartitionFileIndex.FlushedBuffer(i, 
bufferIndex, 0, 1));
                 bufferIndex++;
 
                 if (!isNextRegionContinuous) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
index fc29e07c045..04e808586fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
@@ -35,9 +36,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageTestUtils.generateBuffersToWrite;
@@ -54,7 +53,7 @@ class ProducerMergedPartitionFileReaderTest {
 
     private static final int DEFAULT_BUFFER_NUMBER = 5;
 
-    private static final int DEFAULT_BUFFER_SIZE = 3;
+    private static final int DEFAULT_BUFFER_SIZE = 10;
 
     private static final String DEFAULT_TEST_FILE_NAME = "testFile";
 
@@ -96,9 +95,9 @@ class ProducerMergedPartitionFileReaderTest {
     @Test
     void testReadBuffer() throws IOException {
         for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; 
++bufferIndex) {
-            Buffer buffer = readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID);
-            assertThat(buffer).isNotNull();
-            buffer.recycleBuffer();
+            List<Buffer> buffers = readBuffer(bufferIndex, 
DEFAULT_SUBPARTITION_ID);
+            assertThat(buffers).isNotNull();
+            buffers.forEach(Buffer::recycleBuffer);
         }
         MemorySegment memorySegment =
                 
MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE);
@@ -109,58 +108,90 @@ class ProducerMergedPartitionFileReaderTest {
                                 DEFAULT_SEGMENT_ID,
                                 DEFAULT_BUFFER_NUMBER + 1,
                                 memorySegment,
-                                FreeingBufferRecycler.INSTANCE))
+                                FreeingBufferRecycler.INSTANCE,
+                                null,
+                                null))
                 .isNull();
     }
 
     @Test
     void testGetPriority() throws IOException {
-        int currentFileOffset = 0;
-        for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; 
++bufferIndex) {
-            Buffer buffer = readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID);
-            assertThat(buffer).isNotNull();
+        ProducerMergedPartitionFileReader.ProducerMergedReadProgress 
readProgress = null;
+        CompositeBuffer partialBuffer = null;
+        for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ) {
+            PartitionFileReader.ReadBufferResult readBufferResult =
+                    readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID, 
readProgress, partialBuffer);
+            assertThat(readBufferResult).isNotNull();
+            assertThat(readBufferResult.getReadProgress())
+                    .isInstanceOf(
+                            
ProducerMergedPartitionFileReader.ProducerMergedReadProgress.class);
+            readProgress =
+                    
(ProducerMergedPartitionFileReader.ProducerMergedReadProgress)
+                            readBufferResult.getReadProgress();
+            for (Buffer buffer : readBufferResult.getReadBuffers()) {
+                if (buffer instanceof CompositeBuffer) {
+                    partialBuffer = (CompositeBuffer) buffer;
+                    if (partialBuffer.missingLength() == 0) {
+                        bufferIndex++;
+                        partialBuffer.recycleBuffer();
+                        partialBuffer = null;
+                    }
+                } else {
+                    bufferIndex++;
+                    buffer.recycleBuffer();
+                }
+            }
+
+            long expectedBufferOffset;
+            if (bufferIndex < DEFAULT_BUFFER_NUMBER) {
+                expectedBufferOffset =
+                        readProgress == null ? 0 : 
readProgress.getCurrentBufferOffset();
+            } else {
+                expectedBufferOffset = Long.MAX_VALUE;
+            }
             assertThat(
                             partitionFileReader.getPriority(
                                     DEFAULT_PARTITION_ID,
                                     DEFAULT_SUBPARTITION_ID,
                                     DEFAULT_SEGMENT_ID,
-                                    bufferIndex))
-                    .isEqualTo(currentFileOffset);
-            currentFileOffset += (HEADER_LENGTH + DEFAULT_BUFFER_SIZE);
-            buffer.recycleBuffer();
+                                    bufferIndex,
+                                    readProgress))
+                    .isEqualTo(expectedBufferOffset);
         }
     }
 
     @Test
-    void testCacheExceedMaxNumber() throws IOException {
-        int cacheNumber = 3;
-        AtomicInteger indexQueryTime = new AtomicInteger(0);
-        TestingProducerMergedPartitionFileIndex partitionFileIndex =
-                new TestingProducerMergedPartitionFileIndex.Builder()
-                        .setIndexFilePath(new File(tempFolder.toFile(), 
"test-Index").toPath())
-                        .setGetRegionFunction(
-                                (subpartitionId, integer) -> {
-                                    indexQueryTime.incrementAndGet();
-                                    return Optional.of(
-                                            new 
ProducerMergedPartitionFileIndex.FixedSizeRegion(
-                                                    0, 0, 2));
-                                })
-                        .build();
-        partitionFileReader =
-                new ProducerMergedPartitionFileReader(
-                        testFilePath, partitionFileIndex, cacheNumber);
-        // Read different subpartitions from the reader and make cache reach 
max number.
-        for (int subpartitionId = 0; subpartitionId < cacheNumber * 2; 
++subpartitionId) {
-            assertThat(
-                            readBuffer(
-                                    subpartitionId < cacheNumber ? 0 : 1,
-                                    new 
TieredStorageSubpartitionId(subpartitionId % cacheNumber)))
-                    .isNotNull();
+    void testReadProgress() throws IOException {
+        long currentFileOffset = 0;
+        ProducerMergedPartitionFileReader.ProducerMergedReadProgress 
readProgress = null;
+        CompositeBuffer partialBuffer = null;
+        for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ) {
+            PartitionFileReader.ReadBufferResult readBufferResult =
+                    readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID, 
readProgress, partialBuffer);
+            assertThat(readBufferResult).isNotNull();
+            assertThat(readBufferResult.getReadProgress())
+                    .isInstanceOf(
+                            
ProducerMergedPartitionFileReader.ProducerMergedReadProgress.class);
+            readProgress =
+                    
(ProducerMergedPartitionFileReader.ProducerMergedReadProgress)
+                            readBufferResult.getReadProgress();
+            for (Buffer buffer : readBufferResult.getReadBuffers()) {
+                if (buffer instanceof CompositeBuffer) {
+                    partialBuffer = (CompositeBuffer) buffer;
+                    if (partialBuffer.missingLength() == 0) {
+                        bufferIndex++;
+                        currentFileOffset += partialBuffer.readableBytes() + 
HEADER_LENGTH;
+                        partialBuffer.recycleBuffer();
+                        partialBuffer = null;
+                    }
+                } else {
+                    bufferIndex++;
+                    currentFileOffset += buffer.readableBytes() + 
HEADER_LENGTH;
+                    buffer.recycleBuffer();
+                }
+            }
+            
assertThat(readProgress.getCurrentBufferOffset()).isEqualTo(currentFileOffset);
         }
-        // The following buffer reading from other subpartitions can only 
query the index.
-        assertThat(readBuffer(0, new 
TieredStorageSubpartitionId(3))).isNotNull();
-        assertThat(readBuffer(0, new 
TieredStorageSubpartitionId(3))).isNotNull();
-        assertThat(indexQueryTime).hasValue(5);
     }
 
     @Test
@@ -170,7 +201,16 @@ class ProducerMergedPartitionFileReaderTest {
         assertThat(testFilePath.toFile().exists()).isFalse();
     }
 
-    private Buffer readBuffer(int bufferIndex, TieredStorageSubpartitionId 
subpartitionId)
+    private List<Buffer> readBuffer(int bufferIndex, 
TieredStorageSubpartitionId subpartitionId)
+            throws IOException {
+        return readBuffer(bufferIndex, subpartitionId, null, 
null).getReadBuffers();
+    }
+
+    private PartitionFileReader.ReadBufferResult readBuffer(
+            int bufferIndex,
+            TieredStorageSubpartitionId subpartitionId,
+            PartitionFileReader.ReadProgress readProgress,
+            CompositeBuffer partialBuffer)
             throws IOException {
         MemorySegment memorySegment =
                 
MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE);
@@ -180,6 +220,8 @@ class ProducerMergedPartitionFileReaderTest {
                 DEFAULT_SEGMENT_ID,
                 bufferIndex,
                 memorySegment,
-                FreeingBufferRecycler.INSTANCE);
+                FreeingBufferRecycler.INSTANCE,
+                readProgress,
+                partialBuffer);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java
index 1ba7fb36070..766e0827a37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java
@@ -102,12 +102,12 @@ class SegmentPartitionFileReaderTest {
     void testGetPriority() throws IOException {
         assertThat(
                         partitionFileReader.getPriority(
-                                DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 
0, 0))
+                                DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 
0, 0, null))
                 .isEqualTo(-1);
         assertThat(readBuffer(0, DEFAULT_SUBPARTITION_ID, 0)).isNotNull();
         assertThat(
                         partitionFileReader.getPriority(
-                                DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 
0, 1))
+                                DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 
0, 1, null))
                 .isEqualTo(-1);
     }
 
@@ -116,12 +116,19 @@ class SegmentPartitionFileReaderTest {
             throws IOException {
         MemorySegment memorySegment =
                 
MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE);
-        return partitionFileReader.readBuffer(
-                DEFAULT_PARTITION_ID,
-                subpartitionId,
-                segmentId,
-                bufferIndex,
-                memorySegment,
-                FreeingBufferRecycler.INSTANCE);
+        PartitionFileReader.ReadBufferResult readBufferResult =
+                partitionFileReader.readBuffer(
+                        DEFAULT_PARTITION_ID,
+                        subpartitionId,
+                        segmentId,
+                        bufferIndex,
+                        memorySegment,
+                        FreeingBufferRecycler.INSTANCE,
+                        null,
+                        null);
+        if (readBufferResult == null) {
+            return null;
+        }
+        return readBufferResult.getReadBuffers().get(0);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java
index 572f3b97d3b..c88b68ac486 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -31,14 +33,14 @@ import java.util.function.Function;
 /** Testing implementation for {@link PartitionFileReader}. */
 public class TestingPartitionFileReader implements PartitionFileReader {
 
-    private final BiFunction<Integer, Integer, Buffer> readBufferFunction;
+    private final BiFunction<Integer, Integer, ReadBufferResult> 
readBufferFunction;
 
     private final Function<Integer, Long> getPriorityFunction;
 
     private final Runnable releaseRunnable;
 
     private TestingPartitionFileReader(
-            BiFunction<Integer, Integer, Buffer> readBufferFunction,
+            BiFunction<Integer, Integer, ReadBufferResult> readBufferFunction,
             Function<Integer, Long> getPriorityFunction,
             Runnable releaseRunnable) {
         this.readBufferFunction = readBufferFunction;
@@ -47,13 +49,15 @@ public class TestingPartitionFileReader implements 
PartitionFileReader {
     }
 
     @Override
-    public Buffer readBuffer(
+    public ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException {
         return readBufferFunction.apply(bufferIndex, segmentId);
     }
@@ -63,7 +67,8 @@ public class TestingPartitionFileReader implements 
PartitionFileReader {
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex) {
+            int bufferIndex,
+            @Nullable ReadProgress readProgress) {
         return getPriorityFunction.apply(subpartitionId.getSubpartitionId());
     }
 
@@ -74,7 +79,7 @@ public class TestingPartitionFileReader implements 
PartitionFileReader {
 
     /** Builder for {@link TestingPartitionFileReader}. */
     public static class Builder {
-        private BiFunction<Integer, Integer, Buffer> readBufferSupplier =
+        private BiFunction<Integer, Integer, ReadBufferResult> 
readBufferSupplier =
                 (bufferIndex, segmentId) -> null;
 
         private Function<Integer, Long> prioritySupplier = bufferIndex -> 0L;
@@ -82,7 +87,7 @@ public class TestingPartitionFileReader implements 
PartitionFileReader {
         private Runnable releaseNotifier = () -> {};
 
         public Builder setReadBufferSupplier(
-                BiFunction<Integer, Integer, Buffer> readBufferSupplier) {
+                BiFunction<Integer, Integer, ReadBufferResult> 
readBufferSupplier) {
             this.readBufferSupplier = readBufferSupplier;
             return this;
         }

Reply via email to