This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7997cf152cc59b45c51977b825c379c0421b575e Author: Yuxin Tan <tanyuxinw...@gmail.com> AuthorDate: Fri Sep 15 10:32:34 2023 +0800 [FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage --- .../tiered/common/TieredStorageConfiguration.java | 10 ++++++ .../hybrid/tiered/tier/disk/DiskCacheManager.java | 26 +++++++++++---- .../hybrid/tiered/tier/disk/DiskTierFactory.java | 5 +++ .../tiered/tier/disk/DiskTierProducerAgent.java | 2 ++ .../tiered/tier/disk/DiskCacheManagerTest.java | 37 ++++++++++++++++++++++ .../tier/disk/DiskTierProducerAgentTest.java | 1 + 6 files changed, 75 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java index d031d7278c4..abc6c955d53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java @@ -67,6 +67,8 @@ public class TieredStorageConfiguration { private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L; + private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 1024; + private final String remoteStorageBasePath; private final int tieredStorageBufferSize; @@ -330,6 +332,8 @@ public class TieredStorageConfiguration { private long numRetainedInMemoryRegionsMax = DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY; + private int maxCachedBytesBeforeFlush = DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH; + private List<TierFactory> tierFactories; private List<Integer> tierExclusiveBuffers; @@ -416,6 +420,11 @@ public class TieredStorageConfiguration { return this; } + public Builder setMaxCachedBytesBeforeFlush(int maxCachedBytesBeforeFlush) { + this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; + return this; + } + public TieredStorageConfiguration build() { setupTierFactoriesAndExclusiveBuffers(); return new TieredStorageConfiguration( @@ -451,6 +460,7 @@ public class TieredStorageConfiguration { tieredStorageBufferSize, minReserveDiskSpaceFraction, regionGroupSizeInBytes, + maxCachedBytesBeforeFlush, numRetainedInMemoryRegionsMax)); tierExclusiveBuffers.add(diskTierExclusiveBuffers); if (remoteStorageBasePath != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java index f5d7f30e0c0..feb3a1eb1d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java @@ -41,6 +41,8 @@ class DiskCacheManager { private final int numSubpartitions; + private final int maxCachedBytesBeforeFlush; + private final PartitionFileWriter partitionFileWriter; private final SubpartitionDiskCacheManager[] subpartitionCacheManagers; @@ -48,13 +50,21 @@ class DiskCacheManager { /** Whether the current flush process has completed. */ private CompletableFuture<Void> hasFlushCompleted; + /** + * The number of all subpartition's cached bytes in the cache manager. Note that the counter can + * only be accessed by the task thread and does not require locks. + */ + private int numCachedBytesCounter; + DiskCacheManager( TieredStoragePartitionId partitionId, int numSubpartitions, + int maxCachedBytesBeforeFlush, TieredStorageMemoryManager memoryManager, PartitionFileWriter partitionFileWriter) { this.partitionId = partitionId; this.numSubpartitions = numSubpartitions; + this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; this.partitionFileWriter = partitionFileWriter; this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions]; this.hasFlushCompleted = FutureUtils.completedVoidFuture(); @@ -81,6 +91,7 @@ class DiskCacheManager { */ void append(Buffer buffer, int subpartitionId) { subpartitionCacheManagers[subpartitionId].append(buffer); + increaseNumCachedBytesAndCheckFlush(buffer.readableBytes()); } /** @@ -92,12 +103,7 @@ class DiskCacheManager { */ void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId) { subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record); - - // When finishing a segment, the buffers should be flushed because the next segment may be - // written to another tier. If the buffers in this tier are not flushed here, then the next - // segment in another tier may be stuck by lacking buffers. This flush has a low trigger - // frequency, so its impact on performance is relatively small. - forceFlushCachedBuffers(); + increaseNumCachedBytesAndCheckFlush(record.remaining()); } /** @@ -127,6 +133,13 @@ class DiskCacheManager { // Internal Methods // ------------------------------------------------------------------------ + private void increaseNumCachedBytesAndCheckFlush(int numIncreasedCachedBytes) { + numCachedBytesCounter += numIncreasedCachedBytes; + if (numCachedBytesCounter > maxCachedBytesBeforeFlush) { + forceFlushCachedBuffers(); + } + } + private void notifyFlushCachedBuffers() { flushBuffers(false); } @@ -153,6 +166,7 @@ class DiskCacheManager { hasFlushCompleted = flushCompletableFuture; } } + numCachedBytesCounter = 0; } private int getSubpartitionToFlushBuffers( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java index fad6677f4b8..4806e2346e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java @@ -54,6 +54,8 @@ public class DiskTierFactory implements TierFactory { private final int regionGroupSizeInBytes; + private final int maxCachedBytesBeforeFlush; + private final long numRetainedInMemoryRegionsMax; public DiskTierFactory( @@ -61,11 +63,13 @@ public class DiskTierFactory implements TierFactory { int bufferSizeBytes, float minReservedDiskSpaceFraction, int regionGroupSizeInBytes, + int maxCachedBytesBeforeFlush, long numRetainedInMemoryRegionsMax) { this.numBytesPerSegment = numBytesPerSegment; this.bufferSizeBytes = bufferSizeBytes; this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction; this.regionGroupSizeInBytes = regionGroupSizeInBytes; + this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush; this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax; } @@ -106,6 +110,7 @@ public class DiskTierFactory implements TierFactory { numSubpartitions, numBytesPerSegment, bufferSizeBytes, + maxCachedBytesBeforeFlush, dataFilePath, minReservedDiskSpaceFraction, isBroadcastOnly, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java index cf91eca86f6..acd8c3a392d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java @@ -86,6 +86,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro int numSubpartitions, int numBytesPerSegment, int bufferSizeBytes, + int maxCachedBytesBeforeFlush, Path dataFilePath, float minReservedDiskSpaceFraction, boolean isBroadcastOnly, @@ -122,6 +123,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro new DiskCacheManager( partitionId, isBroadcastOnly ? 1 : numSubpartitions, + maxCachedBytesBeforeFlush, memoryManager, partitionFileWriter); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java index 7297d89d8e7..b07c550c145 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java @@ -73,6 +73,7 @@ class DiskCacheManagerTest { new DiskCacheManager( TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, + 1024, memoryManager, partitionFileWriter); @@ -110,11 +111,13 @@ class DiskCacheManagerTest { new DiskCacheManager( TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, + 1024, memoryManager, partitionFileWriter); diskCacheManager.appendEndOfSegmentEvent( EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0); + diskCacheManager.close(); assertThat(receivedBuffers).hasSize(1); List<PartitionFileWriter.SegmentBufferContext> segmentBufferContexts = receivedBuffers.get(0).getSegmentBufferContexts(); @@ -131,6 +134,39 @@ class DiskCacheManagerTest { assertThat(event).isInstanceOf(EndOfSegmentEvent.class); } + @Test + void testFlushWhenCachedBytesReachLimit() throws IOException { + TestingTieredStorageMemoryManager memoryManager = + new TestingTieredStorageMemoryManager.Builder().build(); + + AtomicInteger numWriteTimes = new AtomicInteger(0); + TestingPartitionFileWriter partitionFileWriter = + new TestingPartitionFileWriter.Builder() + .setWriteFunction( + (partitionId, subpartitionBufferContexts) -> { + numWriteTimes.incrementAndGet(); + return FutureUtils.completedVoidFuture(); + }) + .build(); + DiskCacheManager diskCacheManager = + new DiskCacheManager( + TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), + 1, + 1024, + memoryManager, + partitionFileWriter); + diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0); + assertThat(numWriteTimes).hasValue(0); + diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0); + assertThat(numWriteTimes).hasValue(1); + + diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0); + assertThat(numWriteTimes).hasValue(1); + diskCacheManager.appendEndOfSegmentEvent( + EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0); + assertThat(numWriteTimes).hasValue(2); + } + @Test void testRelease() { AtomicBoolean isReleased = new AtomicBoolean(false); @@ -144,6 +180,7 @@ class DiskCacheManagerTest { new DiskCacheManager( TieredStorageIdMappingUtils.convertId(new ResultPartitionID()), 1, + 1024, memoryManager, partitionFileWriter); diskCacheManager.release(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java index 3daf34c4a97..58c09ff9df1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java @@ -212,6 +212,7 @@ public class DiskTierProducerAgentTest { NUM_SUBPARTITIONS, numBytesPerSegment, BUFFER_SIZE_BYTES, + BUFFER_SIZE_BYTES, dataFilePath, minReservedDiskSpaceFraction, isBroadcastOnly,