This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d100ab65367fe0b3d74a9901bcaaa26049c996b0 Author: Yuxin Tan <tanyuxinw...@gmail.com> AuthorDate: Tue Aug 22 15:34:52 2023 +0800 [FLINK-32870][network] Tiered storage supports reading multiple small buffers by reading and slicing one large buffer This closes #23255 --- .../network/partition/BufferReaderWriterUtil.java | 2 +- .../partition/hybrid/HsFileDataIndexImpl.java | 7 +- .../hybrid/index/FileDataIndexRegionHelper.java | 5 +- .../hybrid/index/FileRegionWriteReadUtils.java | 9 +- .../hybrid/tiered/file/PartitionFileReader.java | 79 ++++- .../file/ProducerMergedPartitionFileIndex.java | 36 +- .../file/ProducerMergedPartitionFileReader.java | 383 ++++++++++++++------- .../file/ProducerMergedPartitionFileWriter.java | 3 +- .../tiered/file/SegmentPartitionFileReader.java | 29 +- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 98 ++++-- .../tier/remote/RemoteTierConsumerAgent.java | 17 +- .../partition/hybrid/HybridShuffleTestUtils.java | 9 +- .../hybrid/index/FileDataIndexCacheTest.java | 2 +- .../hybrid/index/FileRegionWriteReadUtilsTest.java | 9 +- .../hybrid/index/TestingFileDataIndexRegion.java | 22 +- .../hybrid/tiered/file/DiskIOSchedulerTest.java | 8 +- .../file/ProducerMergedPartitionFileIndexTest.java | 2 +- .../ProducerMergedPartitionFileReaderTest.java | 132 ++++--- .../file/SegmentPartitionFileReaderTest.java | 25 +- .../tiered/file/TestingPartitionFileReader.java | 21 +- 20 files changed, 642 insertions(+), 256 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java index 863be5b5b44..f89994c2d44 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java @@ -252,7 +252,7 @@ public final class BufferReaderWriterUtil { } } - static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { + public static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { // the post-checked loop here gets away with one less check in the normal case do { if (channel.read(b) == -1) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index 4b4221225cf..25012ffa8f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -220,10 +220,15 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { } @Override - public long getRegionFileOffset() { + public long getRegionStartOffset() { return regionFileOffset; } + @Override + public long getRegionEndOffset() { + throw new UnsupportedOperationException("This method is not supported."); + } + @Override public int getNumBuffers() { return numBuffers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java index c0d4270e109..77638bb3156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexRegionHelper.java @@ -77,7 +77,10 @@ public interface FileDataIndexRegionHelper<T extends FileDataIndexRegionHelper.R int getFirstBufferIndex(); /** Get the file start offset of this region. */ - long getRegionFileOffset(); + long getRegionStartOffset(); + + /** Get the file end offset of the region. */ + long getRegionEndOffset(); /** Get the number of buffers in this region. */ int getNumBuffers(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java index 1ca689d0c2e..f306e4291b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtils.java @@ -59,7 +59,7 @@ public class FileRegionWriteReadUtils { headerBuffer.clear(); headerBuffer.putInt(region.getFirstBufferIndex()); headerBuffer.putInt(region.getNumBuffers()); - headerBuffer.putLong(region.getRegionFileOffset()); + headerBuffer.putLong(region.getRegionStartOffset()); headerBuffer.flip(); // write payload buffer. @@ -122,7 +122,8 @@ public class FileRegionWriteReadUtils { regionBuffer.clear(); regionBuffer.putInt(region.getFirstBufferIndex()); regionBuffer.putInt(region.getNumBuffers()); - regionBuffer.putLong(region.getRegionFileOffset()); + regionBuffer.putLong(region.getRegionStartOffset()); + regionBuffer.putLong(region.getRegionEndOffset()); regionBuffer.flip(); BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), regionBuffer); } @@ -145,6 +146,8 @@ public class FileRegionWriteReadUtils { int firstBufferIndex = regionBuffer.getInt(); int numBuffers = regionBuffer.getInt(); long firstBufferOffset = regionBuffer.getLong(); - return new FixedSizeRegion(firstBufferIndex, firstBufferOffset, numBuffers); + long lastBufferEndOffset = regionBuffer.getLong(); + return new FixedSizeRegion( + firstBufferIndex, firstBufferOffset, lastBufferEndOffset, numBuffers); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java index 0a868c9002a..25920194597 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java @@ -21,12 +21,14 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import javax.annotation.Nullable; import java.io.IOException; +import java.util.List; /** {@link PartitionFileReader} defines the read logic for different types of shuffle files. */ public interface PartitionFileReader { @@ -40,16 +42,25 @@ public interface PartitionFileReader { * @param bufferIndex the index of buffer * @param memorySegment the empty buffer to store the read buffer * @param recycler the buffer recycler - * @return null if there is no data otherwise a buffer. + * @param readProgress the current read progress. The progress comes from the previous + * ReadBufferResult. Note that the read progress should be implemented and provided by + * Flink, and it should be directly tied to the file format. The field can be null if the + * current file reader has no the read progress + * @param partialBuffer the previous partial buffer. The partial buffer is not null only when + * the last read has a partial buffer, it will construct a full buffer during the read + * process. + * @return null if there is no data otherwise return a read buffer result. */ @Nullable - Buffer readBuffer( + ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException; /** @@ -68,14 +79,74 @@ public interface PartitionFileReader { * @param subpartitionId the subpartition id of the buffer * @param segmentId the segment id of the buffer * @param bufferIndex the index of buffer + * @param readProgress the current read progress. The progress comes from the previous + * ReadBufferResult. Note that the read progress should be implemented and provided by + * Flink, and it should be directly tied to the file format. The field can be null if the + * current file reader has no the read progress * @return the priority of the {@link PartitionFileReader}. */ long getPriority( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex); + int bufferIndex, + @Nullable ReadProgress readProgress); /** Release the {@link PartitionFileReader}. */ void release(); + + /** + * This {@link ReadProgress} defines the read progress of the {@link PartitionFileReader}. + * + * <p>Note that the implementation of the interface should strongly bind with the implementation + * of {@link PartitionFileReader}. + */ + interface ReadProgress {} + + /** + * A wrapper class of the reading buffer result, including the read buffers, the hint of + * continue reading, and the read progress, etc. + */ + class ReadBufferResult { + + /** The read buffers. */ + private final List<Buffer> readBuffers; + + /** + * A hint to determine whether the caller may continue reading the following buffers. Note + * that this hint is merely a recommendation and not obligatory. Following the hint while + * reading buffers may improve performance. + */ + private final boolean continuousReadSuggested; + + /** + * The read progress state. + * + * <p>Note that the field can be null if the current file reader has no the read progress + * state when reading buffers. + */ + @Nullable private final ReadProgress readProgress; + + public ReadBufferResult( + List<Buffer> readBuffers, + boolean continuousReadSuggested, + @Nullable ReadProgress readProgress) { + this.readBuffers = readBuffers; + this.continuousReadSuggested = continuousReadSuggested; + this.readProgress = readProgress; + } + + public List<Buffer> getReadBuffers() { + return readBuffers; + } + + public boolean continuousReadSuggested() { + return continuousReadSuggested; + } + + @Nullable + public ReadProgress getReadProgress() { + return readProgress; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java index 01cc9086d56..d6a08da5fb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java @@ -173,6 +173,8 @@ public class ProducerMergedPartitionFileIndex { new FixedSizeRegion( firstBufferInRegion.getBufferIndex(), firstBufferInRegion.getFileOffset(), + lastBufferInRegion.getFileOffset() + + lastBufferInRegion.getBufferSizeBytes(), lastBufferInRegion.getBufferIndex() - firstBufferInRegion.getBufferIndex() + 1)); @@ -193,10 +195,13 @@ public class ProducerMergedPartitionFileIndex { /** The file offset that the buffer begin with. */ private final long fileOffset; - FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset) { + private final long bufferSizeBytes; + + FlushedBuffer(int subpartitionId, int bufferIndex, long fileOffset, long bufferSizeBytes) { this.subpartitionId = subpartitionId; this.bufferIndex = bufferIndex; this.fileOffset = fileOffset; + this.bufferSizeBytes = bufferSizeBytes; } int getSubpartitionId() { @@ -210,6 +215,10 @@ public class ProducerMergedPartitionFileIndex { long getFileOffset() { return fileOffset; } + + long getBufferSizeBytes() { + return bufferSizeBytes; + } } /** @@ -257,20 +266,28 @@ public class ProducerMergedPartitionFileIndex { */ public static class FixedSizeRegion implements FileDataIndexRegionHelper.Region { - public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + Integer.BYTES; + public static final int REGION_SIZE = + Integer.BYTES + Long.BYTES + Integer.BYTES + Long.BYTES; /** The buffer index of first buffer. */ private final int firstBufferIndex; /** The file offset of the region. */ - private final long regionFileOffset; + private final long regionStartOffset; + + private final long regionEndOffset; /** The number of buffers that the region contains. */ private final int numBuffers; - public FixedSizeRegion(int firstBufferIndex, long regionFileOffset, int numBuffers) { + public FixedSizeRegion( + int firstBufferIndex, + long regionStartOffset, + long regionEndOffset, + int numBuffers) { this.firstBufferIndex = firstBufferIndex; - this.regionFileOffset = regionFileOffset; + this.regionStartOffset = regionStartOffset; + this.regionEndOffset = regionEndOffset; this.numBuffers = numBuffers; } @@ -286,8 +303,13 @@ public class ProducerMergedPartitionFileIndex { } @Override - public long getRegionFileOffset() { - return regionFileOffset; + public long getRegionStartOffset() { + return regionStartOffset; + } + + @Override + public long getRegionEndOffset() { + return regionEndOffset; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java index 00b8bd49db4..702a67213cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java @@ -22,25 +22,34 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferHeader; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.HashMap; -import java.util.Map; +import java.util.LinkedList; +import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer; -import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; /** * The implementation of {@link PartitionFileReader} with producer-merge mode. In this mode, the @@ -51,25 +60,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ProducerMergedPartitionFileReader implements PartitionFileReader { - /** - * Max number of caches. - * - * <p>The constant defines the maximum number of caches that can be created. Its value is set to - * 10000, which is considered sufficient for most parallel jobs. Each cache only contains - * references and numerical variables and occupies a minimal amount of memory so the value is - * not excessively large. - */ - private static final int DEFAULT_MAX_CACHE_NUM = 10000; - - /** - * Buffer offset caches stored in map. - * - * <p>The key is the combination of {@link TieredStorageSubpartitionId} and buffer index. The - * value is the buffer offset cache, which includes file offset of the buffer index, the region - * containing the buffer index and next buffer index to consume. - */ - private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, BufferOffsetCache> - bufferOffsetCaches; + private static final Logger LOG = + LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class); private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); @@ -77,62 +69,63 @@ public class ProducerMergedPartitionFileReader implements PartitionFileReader { private final ProducerMergedPartitionFileIndex dataIndex; - private final int maxCacheNumber; - private volatile FileChannel fileChannel; - /** The current number of caches. */ - private int numCaches; - - ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { - this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM); - } - @VisibleForTesting ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int maxCacheNumber) { + Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { this.dataFilePath = dataFilePath; this.dataIndex = dataIndex; - this.bufferOffsetCaches = new HashMap<>(); - this.maxCacheNumber = maxCacheNumber; } @Override - public Buffer readBuffer( + public ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true); - if (!cache.isPresent()) { + + // Get the read offset, including the start offset, the end offset + Tuple2<Long, Long> startAndEndOffset = + getReadStartAndEndOffset(subpartitionId, bufferIndex, readProgress, partialBuffer); + if (startAndEndOffset == null) { return null; } - fileChannel.position(cache.get().getFileOffset()); - Buffer buffer = - readFromByteChannel(fileChannel, reusedHeaderBuffer, memorySegment, recycler); - boolean hasNextBuffer = - cache.get() - .advance( - checkNotNull(buffer).readableBytes() - + BufferReaderWriterUtil.HEADER_LENGTH); - if (hasNextBuffer) { - int nextBufferIndex = bufferIndex + 1; - // TODO: introduce the LRU cache strategy in the future to restrict the total - // cache number. Testing to prevent cache leaks has been implemented. - if (numCaches < maxCacheNumber) { - bufferOffsetCaches.put(Tuple2.of(subpartitionId, nextBufferIndex), cache.get()); - numCaches++; - } + long readStartOffset = startAndEndOffset.f0; + long readEndOffset = startAndEndOffset.f1; + + int numBytesToRead = + Math.min(memorySegment.size(), (int) (readEndOffset - readStartOffset)); + + if (numBytesToRead == 0) { + return null; } - return buffer; + + List<Buffer> readBuffers = new LinkedList<>(); + ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead); + fileChannel.position(readStartOffset); + // Read data to the memory segment, note the read size is numBytesToRead + readFileDataToBuffer(memorySegment, recycler, byteBuffer); + + // Slice the read memory segment to multiple small network buffers and add them to + // readBuffers + Tuple2<Integer, Integer> partial = + sliceBuffer(byteBuffer, memorySegment, partialBuffer, recycler, readBuffers); + + return getReadBufferResult( + readBuffers, + readStartOffset, + readEndOffset, + numBytesToRead, + partial.f0, + partial.f1); } @Override @@ -140,12 +133,18 @@ public class ProducerMergedPartitionFileReader implements PartitionFileReader { TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex) { + int bufferIndex, + @Nullable ReadProgress readProgress) { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - return tryGetCache(cacheKey, false) - .map(BufferOffsetCache::getFileOffset) + + ProducerMergedReadProgress progress = convertToCurrentReadProgress(readProgress); + if (progress != null + && progress.getCurrentBufferOffset() != progress.getEndOfRegionOffset()) { + return progress.getCurrentBufferOffset(); + } + return dataIndex + .getRegion(subpartitionId, bufferIndex) + .map(ProducerMergedPartitionFileIndex.FixedSizeRegion::getRegionStartOffset) .orElse(Long.MAX_VALUE); } @@ -176,91 +175,217 @@ public class ProducerMergedPartitionFileReader implements PartitionFileReader { } /** - * Try to get the cache according to the key. + * Slice the read memory segment to multiple small network buffers. * - * <p>If the relevant buffer offset cache exists, it will be returned and subsequently removed. - * However, if the buffer offset cache does not exist, a new cache will be created using the - * data index and returned. + * <p>Note that although the method appears to be split into multiple buffers, the sliced + * buffers still share the same one actual underlying memory segment. * - * @param cacheKey the key of cache. - * @param removeKey boolean decides whether to remove key. - * @return returns the relevant buffer offset cache if it exists, otherwise return {@link - * Optional#empty()}. + * @param byteBuffer the byte buffer to be sliced, it points to the underlying memorySegment + * @param memorySegment the underlying memory segment to be sliced + * @param partialBuffer the partial buffer, if the partial buffer is not null, it contains the + * partial data buffer from the previous read + * @param readBuffers the read buffers list is to accept the sliced buffers + * @return the first field is the number of total sliced bytes, the second field is the bytes of + * the partial buffer */ - private Optional<BufferOffsetCache> tryGetCache( - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean removeKey) { - BufferOffsetCache bufferOffsetCache = bufferOffsetCaches.remove(cacheKey); - if (bufferOffsetCache == null) { - Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> regionOpt = - dataIndex.getRegion(cacheKey.f0, cacheKey.f1); - return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, region)); - } else { - if (removeKey) { - numCaches--; - } else { - bufferOffsetCaches.put(cacheKey, bufferOffsetCache); + private Tuple2<Integer, Integer> sliceBuffer( + ByteBuffer byteBuffer, + MemorySegment memorySegment, + @Nullable CompositeBuffer partialBuffer, + BufferRecycler bufferRecycler, + List<Buffer> readBuffers) { + checkState(reusedHeaderBuffer.position() == 0); + checkState(partialBuffer == null || partialBuffer.missingLength() > 0); + + NetworkBuffer buffer = new NetworkBuffer(memorySegment, bufferRecycler); + buffer.setSize(byteBuffer.remaining()); + + try { + int numSlicedBytes = 0; + if (partialBuffer != null) { + // If there is a previous small partial buffer, the current read operation should + // read additional data and combine it with the existing partial to construct a new + // complete buffer + buffer.retainBuffer(); + int position = byteBuffer.position() + partialBuffer.missingLength(); + int numPartialBytes = partialBuffer.missingLength(); + partialBuffer.addPartialBuffer( + buffer.readOnlySlice(byteBuffer.position(), numPartialBytes)); + numSlicedBytes += numPartialBytes; + byteBuffer.position(position); + readBuffers.add(partialBuffer); + } + + partialBuffer = null; + while (byteBuffer.hasRemaining()) { + // Parse the small buffer's header + BufferHeader header = parseBufferHeader(byteBuffer); + if (header == null) { + // If the remaining data length in the buffer is not enough to construct a new + // complete buffer header, drop it directly. + break; + } else { + numSlicedBytes += HEADER_LENGTH; + } + + if (header.getLength() <= byteBuffer.remaining()) { + // The remaining data length in the buffer is enough to generate a new small + // sliced network buffer. The small sliced buffer is not a partial buffer, we + // should read the slice of the buffer directly + buffer.retainBuffer(); + ReadOnlySlicedNetworkBuffer slicedBuffer = + buffer.readOnlySlice(byteBuffer.position(), header.getLength()); + slicedBuffer.setDataType(header.getDataType()); + slicedBuffer.setCompressed(header.isCompressed()); + byteBuffer.position(byteBuffer.position() + header.getLength()); + numSlicedBytes += header.getLength(); + readBuffers.add(slicedBuffer); + } else { + // The remaining data length in the buffer is smaller than the actual length of + // the buffer, so we should generate a new partial buffer, allowing for + // generating a new complete buffer during the next read operation + buffer.retainBuffer(); + int numPartialBytes = byteBuffer.remaining(); + numSlicedBytes += numPartialBytes; + partialBuffer = new CompositeBuffer(header); + partialBuffer.addPartialBuffer( + buffer.readOnlySlice(byteBuffer.position(), numPartialBytes)); + readBuffers.add(partialBuffer); + break; + } } - return Optional.of(bufferOffsetCache); + return Tuple2.of(numSlicedBytes, getPartialBufferReadBytes(partialBuffer)); + } catch (Throwable throwable) { + LOG.error("Failed to slice the read buffer {}.", byteBuffer, throwable); + throw throwable; + } finally { + buffer.recycleBuffer(); } } /** - * The {@link BufferOffsetCache} represents the file offset cache for a buffer index. Each cache - * includes file offset of the buffer index, the region containing the buffer index and next - * buffer index to consume. + * Return a tuple of the start and end file offset, or return null if the buffer is not found in + * the data index. */ - private class BufferOffsetCache { + @Nullable + private Tuple2<Long, Long> getReadStartAndEndOffset( + TieredStorageSubpartitionId subpartitionId, + int bufferIndex, + @Nullable ReadProgress currentReadProgress, + @Nullable CompositeBuffer partialBuffer) { + ProducerMergedReadProgress readProgress = convertToCurrentReadProgress(currentReadProgress); + long readStartOffset; + long readEndOffset; + if (readProgress == null + || readProgress.getCurrentBufferOffset() == readProgress.getEndOfRegionOffset()) { + Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> regionOpt = + dataIndex.getRegion(subpartitionId, bufferIndex); + if (!regionOpt.isPresent()) { + return null; + } + readStartOffset = regionOpt.get().getRegionStartOffset(); + readEndOffset = regionOpt.get().getRegionEndOffset(); + } else { + readStartOffset = + readProgress.getCurrentBufferOffset() + + getPartialBufferReadBytes(partialBuffer); + readEndOffset = readProgress.getEndOfRegionOffset(); + } - private final ProducerMergedPartitionFileIndex.FixedSizeRegion region; + checkState(readStartOffset <= readEndOffset); + return Tuple2.of(readStartOffset, readEndOffset); + } - private long fileOffset; + private static ReadBufferResult getReadBufferResult( + List<Buffer> readBuffers, + long readStartOffset, + long readEndOffset, + int numBytesToRead, + int numBytesRealRead, + int numBytesReadPartialBuffer) { + boolean shouldContinueRead = readStartOffset + numBytesRealRead < readEndOffset; + ProducerMergedReadProgress readProgress = + new ProducerMergedReadProgress( + readStartOffset + numBytesRealRead - numBytesReadPartialBuffer, + readEndOffset); + checkState( + numBytesRealRead <= numBytesToRead + && numBytesToRead - numBytesRealRead < HEADER_LENGTH); - private int nextBufferIndex; + return new ReadBufferResult(readBuffers, shouldContinueRead, readProgress); + } - private BufferOffsetCache( - int bufferIndex, ProducerMergedPartitionFileIndex.FixedSizeRegion region) { - this.nextBufferIndex = bufferIndex; - this.region = region; - moveFileOffsetToBuffer(bufferIndex); + private void readFileDataToBuffer( + MemorySegment memorySegment, BufferRecycler recycler, ByteBuffer byteBuffer) + throws IOException { + try { + BufferReaderWriterUtil.readByteBufferFully(fileChannel, byteBuffer); + byteBuffer.flip(); + } catch (Throwable throwable) { + recycler.recycle(memorySegment); + throw throwable; } + } - /** - * Get the file offset. - * - * @return the file offset. - */ - private long getFileOffset() { - return fileOffset; + private static int getPartialBufferReadBytes(@Nullable CompositeBuffer partialBuffer) { + return partialBuffer == null ? 0 : partialBuffer.readableBytes() + HEADER_LENGTH; + } + + private static ProducerMergedReadProgress convertToCurrentReadProgress( + @Nullable ReadProgress readProgress) { + if (readProgress == null) { + return null; } + checkState(readProgress instanceof ProducerMergedReadProgress); + return (ProducerMergedReadProgress) readProgress; + } - /** - * Updates the {@link BufferOffsetCache} upon the retrieval of a buffer from the file using - * the file offset in the {@link BufferOffsetCache}. - * - * @param bufferSize denotes the size of the buffer. - * @return return true if there are remaining buffers in the region, otherwise return false. - */ - private boolean advance(long bufferSize) { - nextBufferIndex++; - fileOffset += bufferSize; - return nextBufferIndex < (region.getFirstBufferIndex() + region.getNumBuffers()); + private BufferHeader parseBufferHeader(ByteBuffer buffer) { + checkArgument(reusedHeaderBuffer.position() == 0); + + BufferHeader header = null; + try { + if (buffer.remaining() >= HEADER_LENGTH) { + // The remaining data length in the buffer is enough to construct a new complete + // buffer, parse and create a new buffer header + header = BufferReaderWriterUtil.parseBufferHeader(buffer); + } + // If the remaining data length in the buffer is smaller than the header. Drop it + // directly + } catch (Throwable throwable) { + reusedHeaderBuffer.clear(); + LOG.error("Failed to parse buffer header.", throwable); + throw throwable; } + reusedHeaderBuffer.clear(); + return header; + } + /** + * The implementation of {@link PartitionFileReader.ReadProgress} mainly includes current + * reading offset, end of read offset, etc. + */ + public static class ProducerMergedReadProgress implements PartitionFileReader.ReadProgress { /** - * Relocates the file channel offset to the position of the specified buffer index. - * - * @param bufferIndex denotes the index of the buffer. + * The current reading buffer file offset. Note the offset does not contain the length of + * the partial buffer, because the partial buffer may be dropped at anytime. */ - private void moveFileOffsetToBuffer(int bufferIndex) { - try { - checkNotNull(fileChannel).position(region.getRegionFileOffset()); - for (int i = 0; i < (bufferIndex - region.getFirstBufferIndex()); ++i) { - positionToNextBuffer(fileChannel, reusedHeaderBuffer); - } - fileOffset = fileChannel.position(); - } catch (IOException e) { - ExceptionUtils.rethrow(e, "Failed to move file offset"); - } + private final long currentBufferOffset; + + /** The end of region file offset. */ + private final long endOfRegionOffset; + + public ProducerMergedReadProgress(long currentBufferOffset, long endOfRegionOffset) { + this.currentBufferOffset = currentBufferOffset; + this.endOfRegionOffset = endOfRegionOffset; + } + + public long getCurrentBufferOffset() { + return currentBufferOffset; + } + + public long getEndOfRegionOffset() { + return endOfRegionOffset; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java index e30def7fcd0..57ce19eb2c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileWriter.java @@ -156,7 +156,8 @@ public class ProducerMergedPartitionFileWriter implements PartitionFileWriter { new ProducerMergedPartitionFileIndex.FlushedBuffer( subpartitionId, bufferWithIndex.f1, - totalBytesWritten + expectedBytes)); + totalBytesWritten + expectedBytes, + buffer.readableBytes() + BufferReaderWriterUtil.HEADER_LENGTH)); expectedBytes += buffer.readableBytes() + BufferReaderWriterUtil.HEADER_LENGTH; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java index 9f79a78cbd8..8c07c505949 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.java @@ -25,17 +25,21 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferHeader; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.util.ExceptionUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -74,13 +78,15 @@ public class SegmentPartitionFileReader implements PartitionFileReader { } @Override - public Buffer readBuffer( + public ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException { // Get the channel of the segment file for a subpartition. @@ -109,7 +115,8 @@ public class SegmentPartitionFileReader implements PartitionFileReader { if (bufferHeaderResult == -1) { channel.close(); openedChannelAndSegmentIds.get(partitionId).remove(subpartitionId); - return new NetworkBuffer(memorySegment, recycler, Buffer.DataType.END_OF_SEGMENT); + return getSingletonReadResult( + new NetworkBuffer(memorySegment, recycler, Buffer.DataType.END_OF_SEGMENT)); } reusedHeaderBuffer.flip(); BufferHeader header = parseBufferHeader(reusedHeaderBuffer); @@ -119,8 +126,13 @@ public class SegmentPartitionFileReader implements PartitionFileReader { throw new IOException("The length of data buffer is illegal."); } Buffer.DataType dataType = header.getDataType(); - return new NetworkBuffer( - memorySegment, recycler, dataType, header.isCompressed(), header.getLength()); + return getSingletonReadResult( + new NetworkBuffer( + memorySegment, + recycler, + dataType, + header.isCompressed(), + header.getLength())); } @Override @@ -128,7 +140,8 @@ public class SegmentPartitionFileReader implements PartitionFileReader { TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex) { + int bufferIndex, + @Nullable ReadProgress readProgress) { // noop return -1; } @@ -166,4 +179,8 @@ public class SegmentPartitionFileReader implements PartitionFileReader { } }); } + + private static ReadBufferResult getSingletonReadResult(NetworkBuffer buffer) { + return new ReadBufferResult(Collections.singletonList(buffer), false, null); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java index 5e8b405c531..bceb5e55c0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java @@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader; @@ -35,6 +36,7 @@ import org.apache.flink.util.FatalExitExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; @@ -338,6 +340,8 @@ public class DiskIOScheduler implements Runnable, BufferRecycler, NettyServicePr private boolean isFailed; + @Nullable private PartitionFileReader.ReadProgress readProgress; + private ScheduledSubpartitionReader( TieredStorageSubpartitionId subpartitionId, NettyConnectionWriter nettyConnectionWriter) { @@ -354,34 +358,46 @@ public class DiskIOScheduler implements Runnable, BufferRecycler, NettyServicePr + subpartitionId + " has already been failed."); } - while (!buffers.isEmpty() - && nettyConnectionWriter.numQueuedBufferPayloads() < maxBufferReadAhead - && nextSegmentId >= 0) { - MemorySegment memorySegment = buffers.poll(); - Buffer buffer; - try { - if ((buffer = - partitionFileReader.readBuffer( - partitionId, - subpartitionId, - nextSegmentId, - nextBufferIndex, - memorySegment, - recycler)) - == null) { + + CompositeBuffer partialBuffer = null; + boolean shouldContinueRead = true; + try { + while (!buffers.isEmpty() && shouldContinueRead && nextSegmentId >= 0) { + MemorySegment memorySegment = buffers.poll(); + PartitionFileReader.ReadBufferResult readBufferResult; + try { + readBufferResult = + partitionFileReader.readBuffer( + partitionId, + subpartitionId, + nextSegmentId, + nextBufferIndex, + memorySegment, + recycler, + readProgress, + partialBuffer); + if (readBufferResult == null) { + buffers.add(memorySegment); + break; + } + } catch (Throwable throwable) { + buffers.add(memorySegment); + throw throwable; + } + + List<Buffer> readBuffers = readBufferResult.getReadBuffers(); + shouldContinueRead = readBufferResult.continuousReadSuggested(); + readProgress = readBufferResult.getReadProgress(); + if (readBuffers.isEmpty()) { buffers.add(memorySegment); break; } - } catch (Throwable throwable) { - buffers.add(memorySegment); - throw throwable; + + partialBuffer = writeFullBuffersAndGetPartialBuffer(readBuffers); } - writeToNettyConnectionWriter( - NettyPayload.newBuffer( - buffer, nextBufferIndex++, subpartitionId.getSubpartitionId())); - if (buffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) { - nextSegmentId = -1; - updateSegmentId(); + } finally { + if (partialBuffer != null) { + partialBuffer.recycleBuffer(); } } } @@ -400,7 +416,39 @@ public class DiskIOScheduler implements Runnable, BufferRecycler, NettyServicePr nextSegmentId < 0 ? Long.MAX_VALUE : partitionFileReader.getPriority( - partitionId, subpartitionId, nextSegmentId, nextBufferIndex); + partitionId, + subpartitionId, + nextSegmentId, + nextBufferIndex, + readProgress); + } + + private CompositeBuffer writeFullBuffersAndGetPartialBuffer(List<Buffer> readBuffers) { + CompositeBuffer partialBuffer = null; + for (int i = 0; i < readBuffers.size(); i++) { + Buffer readBuffer = readBuffers.get(i); + if (i == readBuffers.size() - 1 && isPartialBuffer(readBuffer)) { + partialBuffer = (CompositeBuffer) readBuffer; + continue; + } + writeNettyBufferAndUpdateSegmentId(readBuffer); + } + return partialBuffer; + } + + private boolean isPartialBuffer(Buffer readBuffer) { + return readBuffer instanceof CompositeBuffer + && ((CompositeBuffer) readBuffer).missingLength() > 0; + } + + private void writeNettyBufferAndUpdateSegmentId(Buffer readBuffer) { + writeToNettyConnectionWriter( + NettyPayload.newBuffer( + readBuffer, nextBufferIndex++, subpartitionId.getSubpartitionId())); + if (readBuffer.getDataType() == Buffer.DataType.END_OF_SEGMENT) { + nextSegmentId = -1; + updateSegmentId(); + } } private void writeToNettyConnectionWriter(NettyPayload nettyPayload) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java index a43d3bd304b..12d046e8325 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java @@ -32,9 +32,13 @@ import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** The data client is used to fetch data from remote tier. */ public class RemoteTierConsumerAgent implements TierConsumerAgent { @@ -87,21 +91,26 @@ public class RemoteTierConsumerAgent implements TierConsumerAgent { // Read buffer from the partition file in remote storage. MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(bufferSizeBytes); - Buffer buffer = null; + PartitionFileReader.ReadBufferResult readBufferResult = null; try { - buffer = + readBufferResult = partitionFileReader.readBuffer( partitionId, subpartitionId, segmentId, currentBufferIndex, memorySegment, - FreeingBufferRecycler.INSTANCE); + FreeingBufferRecycler.INSTANCE, + null, + null); } catch (IOException e) { memorySegment.free(); ExceptionUtils.rethrow(e, "Failed to read buffer from partition file."); } - if (buffer != null) { + List<Buffer> readBuffers = checkNotNull(readBufferResult).getReadBuffers(); + if (!readBuffers.isEmpty()) { + checkState(readBuffers.size() == 1); + Buffer buffer = readBuffers.get(0); currentBufferIndexAndSegmentIds .get(partitionId) .put(subpartitionId, Tuple2.of(++currentBufferIndex, segmentId)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index d9abab0b139..24d81230b30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -119,15 +119,18 @@ public class HybridShuffleTestUtils { } public static FileDataIndexRegionHelper.Region createSingleFixedSizeRegion( - int firstBufferIndex, long firstBufferOffset, int numBuffersPerRegion) { + int firstBufferIndex, + long firstBufferOffset, + long lastBufferEndOffset, + int numBuffersPerRegion) { return new ProducerMergedPartitionFileIndex.FixedSizeRegion( - firstBufferIndex, firstBufferOffset, numBuffersPerRegion); + firstBufferIndex, firstBufferOffset, lastBufferEndOffset, numBuffersPerRegion); } public static void assertRegionEquals( FileDataIndexRegionHelper.Region expected, FileDataIndexRegionHelper.Region region) { assertThat(region.getFirstBufferIndex()).isEqualTo(expected.getFirstBufferIndex()); - assertThat(region.getRegionFileOffset()).isEqualTo(expected.getRegionFileOffset()); + assertThat(region.getRegionStartOffset()).isEqualTo(expected.getRegionStartOffset()); assertThat(region.getNumBuffers()).isEqualTo(expected.getNumBuffers()); if (expected instanceof InternalRegion) { assertThat(region).isInstanceOf(InternalRegion.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java index 1ffd3620975..76ef63838af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java @@ -68,7 +68,7 @@ class FileDataIndexCacheTest { .hasValueSatisfying( (region) -> { assertThat(region.getFirstBufferIndex()).isEqualTo(0); - assertThat(region.getRegionFileOffset()).isEqualTo(0); + assertThat(region.getRegionStartOffset()).isEqualTo(0); assertThat(region.getNumBuffers()).isEqualTo(3); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java index 8e04f24186e..4237aeff1db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileRegionWriteReadUtilsTest.java @@ -36,6 +36,7 @@ import java.util.UUID; import static org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleFixedSizeRegion; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex.FixedSizeRegion.REGION_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -83,9 +84,9 @@ class FileRegionWriteReadUtilsTest { @Test void testReadPrematureEndOfFileForFixedSizeRegion(@TempDir Path tmpPath) throws Exception { FileChannel channel = tmpFileChannel(tmpPath); - ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(REGION_SIZE); FileRegionWriteReadUtils.writeFixedSizeRegionToFile( - channel, buffer, createSingleFixedSizeRegion(0, 0L, 1)); + channel, buffer, createSingleFixedSizeRegion(0, 0L, 10L, 1)); channel.truncate(channel.position() - 1); buffer.flip(); assertThatThrownBy( @@ -98,8 +99,8 @@ class FileRegionWriteReadUtilsTest { @Test void testWriteAndReadFixedSizeRegion(@TempDir Path tmpPath) throws Exception { FileChannel channel = tmpFileChannel(tmpPath); - ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); - FileDataIndexRegionHelper.Region region = createSingleFixedSizeRegion(10, 100L, 1); + ByteBuffer buffer = FileRegionWriteReadUtils.allocateAndConfigureBuffer(REGION_SIZE); + FileDataIndexRegionHelper.Region region = createSingleFixedSizeRegion(10, 100L, 110L, 1); FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, buffer, region); buffer.flip(); ProducerMergedPartitionFileIndex.FixedSizeRegion readRegion = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java index 29ee310fba8..3fd42272a38 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/TestingFileDataIndexRegion.java @@ -39,6 +39,8 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg private final Supplier<Long> getRegionFileOffsetSupplier; + private final Supplier<Long> getRegionFileEndOffsetSupplier; + private final Supplier<Integer> getNumBuffersSupplier; private final Function<Integer, Boolean> containBufferFunction; @@ -47,11 +49,13 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg Supplier<Integer> getSizeSupplier, Supplier<Integer> getFirstBufferIndexSupplier, Supplier<Long> getRegionFileOffsetSupplier, + Supplier<Long> getRegionFileEndOffsetSupplier, Supplier<Integer> getNumBuffersSupplier, Function<Integer, Boolean> containBufferFunction) { this.getSizeSupplier = getSizeSupplier; this.getFirstBufferIndexSupplier = getFirstBufferIndexSupplier; this.getRegionFileOffsetSupplier = getRegionFileOffsetSupplier; + this.getRegionFileEndOffsetSupplier = getRegionFileEndOffsetSupplier; this.getNumBuffersSupplier = getNumBuffersSupplier; this.containBufferFunction = containBufferFunction; } @@ -67,10 +71,15 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg } @Override - public long getRegionFileOffset() { + public long getRegionStartOffset() { return getRegionFileOffsetSupplier.get(); } + @Override + public long getRegionEndOffset() { + return getRegionFileEndOffsetSupplier.get(); + } + @Override public int getNumBuffers() { return getNumBuffersSupplier.get(); @@ -87,7 +96,7 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg regionBuffer.clear(); regionBuffer.putInt(region.getFirstBufferIndex()); regionBuffer.putInt(region.getNumBuffers()); - regionBuffer.putLong(region.getRegionFileOffset()); + regionBuffer.putLong(region.getRegionStartOffset()); regionBuffer.flip(); BufferReaderWriterUtil.writeBuffers(channel, regionBuffer.capacity(), regionBuffer); } @@ -130,6 +139,8 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg private Supplier<Long> getRegionFileOffsetSupplier = () -> 0L; + private Supplier<Long> getRegionFileEndOffsetSupplier = () -> 0L; + private Supplier<Integer> getNumBuffersSupplier = () -> 0; private Function<Integer, Boolean> containBufferFunction = bufferIndex -> false; @@ -152,6 +163,12 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg return this; } + public TestingFileDataIndexRegion.Builder setGetRegionFileEndOffsetSupplier( + Supplier<Long> getRegionFileEndOffsetSupplier) { + this.getRegionFileEndOffsetSupplier = getRegionFileEndOffsetSupplier; + return this; + } + public TestingFileDataIndexRegion.Builder setGetNumBuffersSupplier( Supplier<Integer> getNumBuffersSupplier) { this.getNumBuffersSupplier = getNumBuffersSupplier; @@ -169,6 +186,7 @@ public class TestingFileDataIndexRegion implements FileDataIndexRegionHelper.Reg getSizeSupplier, getFirstBufferIndexSupplier, getRegionFileOffsetSupplier, + getRegionFileEndOffsetSupplier, getNumBuffersSupplier, containBufferFunction); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java index 98bccb530f3..8d453d00f2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -97,7 +98,12 @@ class DiskIOSchedulerTest { .setReadBufferSupplier( (bufferIndex, segmentId) -> { segmentIdFuture.complete(segmentId); - return BufferBuilderTestUtils.buildSomeBuffer(0); + return new PartitionFileReader.ReadBufferResult( + Collections.singletonList( + BufferBuilderTestUtils.buildSomeBuffer( + 0)), + true, + null); }) .setReleaseNotifier(() -> readerReleaseFuture.complete(null)) .setPrioritySupplier(subpartitionId -> (long) subpartitionId) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java index 615e2235643..a382511b53a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndexTest.java @@ -81,7 +81,7 @@ class ProducerMergedPartitionFileIndexTest { boolean isNextRegionContinuous = (j == 0 || random.nextBoolean()) && j != numBuffersPerSubpartition - 1; flushedBuffers.add( - new ProducerMergedPartitionFileIndex.FlushedBuffer(i, bufferIndex, 0)); + new ProducerMergedPartitionFileIndex.FlushedBuffer(i, bufferIndex, 0, 1)); bufferIndex++; if (!isNextRegionContinuous) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java index fc29e07c045..04e808586fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReaderTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; @@ -35,9 +36,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageTestUtils.generateBuffersToWrite; @@ -54,7 +53,7 @@ class ProducerMergedPartitionFileReaderTest { private static final int DEFAULT_BUFFER_NUMBER = 5; - private static final int DEFAULT_BUFFER_SIZE = 3; + private static final int DEFAULT_BUFFER_SIZE = 10; private static final String DEFAULT_TEST_FILE_NAME = "testFile"; @@ -96,9 +95,9 @@ class ProducerMergedPartitionFileReaderTest { @Test void testReadBuffer() throws IOException { for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ++bufferIndex) { - Buffer buffer = readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID); - assertThat(buffer).isNotNull(); - buffer.recycleBuffer(); + List<Buffer> buffers = readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID); + assertThat(buffers).isNotNull(); + buffers.forEach(Buffer::recycleBuffer); } MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE); @@ -109,58 +108,90 @@ class ProducerMergedPartitionFileReaderTest { DEFAULT_SEGMENT_ID, DEFAULT_BUFFER_NUMBER + 1, memorySegment, - FreeingBufferRecycler.INSTANCE)) + FreeingBufferRecycler.INSTANCE, + null, + null)) .isNull(); } @Test void testGetPriority() throws IOException { - int currentFileOffset = 0; - for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ++bufferIndex) { - Buffer buffer = readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID); - assertThat(buffer).isNotNull(); + ProducerMergedPartitionFileReader.ProducerMergedReadProgress readProgress = null; + CompositeBuffer partialBuffer = null; + for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ) { + PartitionFileReader.ReadBufferResult readBufferResult = + readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID, readProgress, partialBuffer); + assertThat(readBufferResult).isNotNull(); + assertThat(readBufferResult.getReadProgress()) + .isInstanceOf( + ProducerMergedPartitionFileReader.ProducerMergedReadProgress.class); + readProgress = + (ProducerMergedPartitionFileReader.ProducerMergedReadProgress) + readBufferResult.getReadProgress(); + for (Buffer buffer : readBufferResult.getReadBuffers()) { + if (buffer instanceof CompositeBuffer) { + partialBuffer = (CompositeBuffer) buffer; + if (partialBuffer.missingLength() == 0) { + bufferIndex++; + partialBuffer.recycleBuffer(); + partialBuffer = null; + } + } else { + bufferIndex++; + buffer.recycleBuffer(); + } + } + + long expectedBufferOffset; + if (bufferIndex < DEFAULT_BUFFER_NUMBER) { + expectedBufferOffset = + readProgress == null ? 0 : readProgress.getCurrentBufferOffset(); + } else { + expectedBufferOffset = Long.MAX_VALUE; + } assertThat( partitionFileReader.getPriority( DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, DEFAULT_SEGMENT_ID, - bufferIndex)) - .isEqualTo(currentFileOffset); - currentFileOffset += (HEADER_LENGTH + DEFAULT_BUFFER_SIZE); - buffer.recycleBuffer(); + bufferIndex, + readProgress)) + .isEqualTo(expectedBufferOffset); } } @Test - void testCacheExceedMaxNumber() throws IOException { - int cacheNumber = 3; - AtomicInteger indexQueryTime = new AtomicInteger(0); - TestingProducerMergedPartitionFileIndex partitionFileIndex = - new TestingProducerMergedPartitionFileIndex.Builder() - .setIndexFilePath(new File(tempFolder.toFile(), "test-Index").toPath()) - .setGetRegionFunction( - (subpartitionId, integer) -> { - indexQueryTime.incrementAndGet(); - return Optional.of( - new ProducerMergedPartitionFileIndex.FixedSizeRegion( - 0, 0, 2)); - }) - .build(); - partitionFileReader = - new ProducerMergedPartitionFileReader( - testFilePath, partitionFileIndex, cacheNumber); - // Read different subpartitions from the reader and make cache reach max number. - for (int subpartitionId = 0; subpartitionId < cacheNumber * 2; ++subpartitionId) { - assertThat( - readBuffer( - subpartitionId < cacheNumber ? 0 : 1, - new TieredStorageSubpartitionId(subpartitionId % cacheNumber))) - .isNotNull(); + void testReadProgress() throws IOException { + long currentFileOffset = 0; + ProducerMergedPartitionFileReader.ProducerMergedReadProgress readProgress = null; + CompositeBuffer partialBuffer = null; + for (int bufferIndex = 0; bufferIndex < DEFAULT_BUFFER_NUMBER; ) { + PartitionFileReader.ReadBufferResult readBufferResult = + readBuffer(bufferIndex, DEFAULT_SUBPARTITION_ID, readProgress, partialBuffer); + assertThat(readBufferResult).isNotNull(); + assertThat(readBufferResult.getReadProgress()) + .isInstanceOf( + ProducerMergedPartitionFileReader.ProducerMergedReadProgress.class); + readProgress = + (ProducerMergedPartitionFileReader.ProducerMergedReadProgress) + readBufferResult.getReadProgress(); + for (Buffer buffer : readBufferResult.getReadBuffers()) { + if (buffer instanceof CompositeBuffer) { + partialBuffer = (CompositeBuffer) buffer; + if (partialBuffer.missingLength() == 0) { + bufferIndex++; + currentFileOffset += partialBuffer.readableBytes() + HEADER_LENGTH; + partialBuffer.recycleBuffer(); + partialBuffer = null; + } + } else { + bufferIndex++; + currentFileOffset += buffer.readableBytes() + HEADER_LENGTH; + buffer.recycleBuffer(); + } + } + assertThat(readProgress.getCurrentBufferOffset()).isEqualTo(currentFileOffset); } - // The following buffer reading from other subpartitions can only query the index. - assertThat(readBuffer(0, new TieredStorageSubpartitionId(3))).isNotNull(); - assertThat(readBuffer(0, new TieredStorageSubpartitionId(3))).isNotNull(); - assertThat(indexQueryTime).hasValue(5); } @Test @@ -170,7 +201,16 @@ class ProducerMergedPartitionFileReaderTest { assertThat(testFilePath.toFile().exists()).isFalse(); } - private Buffer readBuffer(int bufferIndex, TieredStorageSubpartitionId subpartitionId) + private List<Buffer> readBuffer(int bufferIndex, TieredStorageSubpartitionId subpartitionId) + throws IOException { + return readBuffer(bufferIndex, subpartitionId, null, null).getReadBuffers(); + } + + private PartitionFileReader.ReadBufferResult readBuffer( + int bufferIndex, + TieredStorageSubpartitionId subpartitionId, + PartitionFileReader.ReadProgress readProgress, + CompositeBuffer partialBuffer) throws IOException { MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE); @@ -180,6 +220,8 @@ class ProducerMergedPartitionFileReaderTest { DEFAULT_SEGMENT_ID, bufferIndex, memorySegment, - FreeingBufferRecycler.INSTANCE); + FreeingBufferRecycler.INSTANCE, + readProgress, + partialBuffer); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java index 1ba7fb36070..766e0827a37 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReaderTest.java @@ -102,12 +102,12 @@ class SegmentPartitionFileReaderTest { void testGetPriority() throws IOException { assertThat( partitionFileReader.getPriority( - DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0, 0)) + DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0, 0, null)) .isEqualTo(-1); assertThat(readBuffer(0, DEFAULT_SUBPARTITION_ID, 0)).isNotNull(); assertThat( partitionFileReader.getPriority( - DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0, 1)) + DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0, 1, null)) .isEqualTo(-1); } @@ -116,12 +116,19 @@ class SegmentPartitionFileReaderTest { throws IOException { MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(DEFAULT_BUFFER_SIZE); - return partitionFileReader.readBuffer( - DEFAULT_PARTITION_ID, - subpartitionId, - segmentId, - bufferIndex, - memorySegment, - FreeingBufferRecycler.INSTANCE); + PartitionFileReader.ReadBufferResult readBufferResult = + partitionFileReader.readBuffer( + DEFAULT_PARTITION_ID, + subpartitionId, + segmentId, + bufferIndex, + memorySegment, + FreeingBufferRecycler.INSTANCE, + null, + null); + if (readBufferResult == null) { + return null; + } + return readBufferResult.getReadBuffers().get(0); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java index 572f3b97d3b..c88b68ac486 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/TestingPartitionFileReader.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.function.BiFunction; import java.util.function.Function; @@ -31,14 +33,14 @@ import java.util.function.Function; /** Testing implementation for {@link PartitionFileReader}. */ public class TestingPartitionFileReader implements PartitionFileReader { - private final BiFunction<Integer, Integer, Buffer> readBufferFunction; + private final BiFunction<Integer, Integer, ReadBufferResult> readBufferFunction; private final Function<Integer, Long> getPriorityFunction; private final Runnable releaseRunnable; private TestingPartitionFileReader( - BiFunction<Integer, Integer, Buffer> readBufferFunction, + BiFunction<Integer, Integer, ReadBufferResult> readBufferFunction, Function<Integer, Long> getPriorityFunction, Runnable releaseRunnable) { this.readBufferFunction = readBufferFunction; @@ -47,13 +49,15 @@ public class TestingPartitionFileReader implements PartitionFileReader { } @Override - public Buffer readBuffer( + public ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException { return readBufferFunction.apply(bufferIndex, segmentId); } @@ -63,7 +67,8 @@ public class TestingPartitionFileReader implements PartitionFileReader { TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex) { + int bufferIndex, + @Nullable ReadProgress readProgress) { return getPriorityFunction.apply(subpartitionId.getSubpartitionId()); } @@ -74,7 +79,7 @@ public class TestingPartitionFileReader implements PartitionFileReader { /** Builder for {@link TestingPartitionFileReader}. */ public static class Builder { - private BiFunction<Integer, Integer, Buffer> readBufferSupplier = + private BiFunction<Integer, Integer, ReadBufferResult> readBufferSupplier = (bufferIndex, segmentId) -> null; private Function<Integer, Long> prioritySupplier = bufferIndex -> 0L; @@ -82,7 +87,7 @@ public class TestingPartitionFileReader implements PartitionFileReader { private Runnable releaseNotifier = () -> {}; public Builder setReadBufferSupplier( - BiFunction<Integer, Integer, Buffer> readBufferSupplier) { + BiFunction<Integer, Integer, ReadBufferResult> readBufferSupplier) { this.readBufferSupplier = readBufferSupplier; return this; }