This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7997cf152cc59b45c51977b825c379c0421b575e
Author: Yuxin Tan <tanyuxinw...@gmail.com>
AuthorDate: Fri Sep 15 10:32:34 2023 +0800

    [FLINK-33044][network] Reduce the frequency of triggering flush for the 
disk tier of tiered storage
---
 .../tiered/common/TieredStorageConfiguration.java  | 10 ++++++
 .../hybrid/tiered/tier/disk/DiskCacheManager.java  | 26 +++++++++++----
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   |  5 +++
 .../tiered/tier/disk/DiskTierProducerAgent.java    |  2 ++
 .../tiered/tier/disk/DiskCacheManagerTest.java     | 37 ++++++++++++++++++++++
 .../tier/disk/DiskTierProducerAgentTest.java       |  1 +
 6 files changed, 75 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index d031d7278c4..abc6c955d53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -67,6 +67,8 @@ public class TieredStorageConfiguration {
 
     private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 
* 1024L;
 
+    private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 
1024;
+
     private final String remoteStorageBasePath;
 
     private final int tieredStorageBufferSize;
@@ -330,6 +332,8 @@ public class TieredStorageConfiguration {
 
         private long numRetainedInMemoryRegionsMax = 
DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY;
 
+        private int maxCachedBytesBeforeFlush = 
DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH;
+
         private List<TierFactory> tierFactories;
 
         private List<Integer> tierExclusiveBuffers;
@@ -416,6 +420,11 @@ public class TieredStorageConfiguration {
             return this;
         }
 
+        public Builder setMaxCachedBytesBeforeFlush(int 
maxCachedBytesBeforeFlush) {
+            this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
+            return this;
+        }
+
         public TieredStorageConfiguration build() {
             setupTierFactoriesAndExclusiveBuffers();
             return new TieredStorageConfiguration(
@@ -451,6 +460,7 @@ public class TieredStorageConfiguration {
                             tieredStorageBufferSize,
                             minReserveDiskSpaceFraction,
                             regionGroupSizeInBytes,
+                            maxCachedBytesBeforeFlush,
                             numRetainedInMemoryRegionsMax));
             tierExclusiveBuffers.add(diskTierExclusiveBuffers);
             if (remoteStorageBasePath != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index f5d7f30e0c0..feb3a1eb1d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -41,6 +41,8 @@ class DiskCacheManager {
 
     private final int numSubpartitions;
 
+    private final int maxCachedBytesBeforeFlush;
+
     private final PartitionFileWriter partitionFileWriter;
 
     private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
@@ -48,13 +50,21 @@ class DiskCacheManager {
     /** Whether the current flush process has completed. */
     private CompletableFuture<Void> hasFlushCompleted;
 
+    /**
+     * The number of all subpartition's cached bytes in the cache manager. 
Note that the counter can
+     * only be accessed by the task thread and does not require locks.
+     */
+    private int numCachedBytesCounter;
+
     DiskCacheManager(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
+            int maxCachedBytesBeforeFlush,
             TieredStorageMemoryManager memoryManager,
             PartitionFileWriter partitionFileWriter) {
         this.partitionId = partitionId;
         this.numSubpartitions = numSubpartitions;
+        this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
         this.partitionFileWriter = partitionFileWriter;
         this.subpartitionCacheManagers = new 
SubpartitionDiskCacheManager[numSubpartitions];
         this.hasFlushCompleted = FutureUtils.completedVoidFuture();
@@ -81,6 +91,7 @@ class DiskCacheManager {
      */
     void append(Buffer buffer, int subpartitionId) {
         subpartitionCacheManagers[subpartitionId].append(buffer);
+        increaseNumCachedBytesAndCheckFlush(buffer.readableBytes());
     }
 
     /**
@@ -92,12 +103,7 @@ class DiskCacheManager {
      */
     void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId) {
         
subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record);
-
-        // When finishing a segment, the buffers should be flushed because the 
next segment may be
-        // written to another tier. If the buffers in this tier are not 
flushed here, then the next
-        // segment in another tier may be stuck by lacking buffers. This flush 
has a low trigger
-        // frequency, so its impact on performance is relatively small.
-        forceFlushCachedBuffers();
+        increaseNumCachedBytesAndCheckFlush(record.remaining());
     }
 
     /**
@@ -127,6 +133,13 @@ class DiskCacheManager {
     //  Internal Methods
     // ------------------------------------------------------------------------
 
+    private void increaseNumCachedBytesAndCheckFlush(int 
numIncreasedCachedBytes) {
+        numCachedBytesCounter += numIncreasedCachedBytes;
+        if (numCachedBytesCounter > maxCachedBytesBeforeFlush) {
+            forceFlushCachedBuffers();
+        }
+    }
+
     private void notifyFlushCachedBuffers() {
         flushBuffers(false);
     }
@@ -153,6 +166,7 @@ class DiskCacheManager {
                 hasFlushCompleted = flushCompletableFuture;
             }
         }
+        numCachedBytesCounter = 0;
     }
 
     private int getSubpartitionToFlushBuffers(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index fad6677f4b8..4806e2346e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -54,6 +54,8 @@ public class DiskTierFactory implements TierFactory {
 
     private final int regionGroupSizeInBytes;
 
+    private final int maxCachedBytesBeforeFlush;
+
     private final long numRetainedInMemoryRegionsMax;
 
     public DiskTierFactory(
@@ -61,11 +63,13 @@ public class DiskTierFactory implements TierFactory {
             int bufferSizeBytes,
             float minReservedDiskSpaceFraction,
             int regionGroupSizeInBytes,
+            int maxCachedBytesBeforeFlush,
             long numRetainedInMemoryRegionsMax) {
         this.numBytesPerSegment = numBytesPerSegment;
         this.bufferSizeBytes = bufferSizeBytes;
         this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
         this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+        this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
         this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
     }
 
@@ -106,6 +110,7 @@ public class DiskTierFactory implements TierFactory {
                 numSubpartitions,
                 numBytesPerSegment,
                 bufferSizeBytes,
+                maxCachedBytesBeforeFlush,
                 dataFilePath,
                 minReservedDiskSpaceFraction,
                 isBroadcastOnly,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index cf91eca86f6..acd8c3a392d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -86,6 +86,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             int numSubpartitions,
             int numBytesPerSegment,
             int bufferSizeBytes,
+            int maxCachedBytesBeforeFlush,
             Path dataFilePath,
             float minReservedDiskSpaceFraction,
             boolean isBroadcastOnly,
@@ -122,6 +123,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
                 new DiskCacheManager(
                         partitionId,
                         isBroadcastOnly ? 1 : numSubpartitions,
+                        maxCachedBytesBeforeFlush,
                         memoryManager,
                         partitionFileWriter);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
index 7297d89d8e7..b07c550c145 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
@@ -73,6 +73,7 @@ class DiskCacheManagerTest {
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new 
ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
 
@@ -110,11 +111,13 @@ class DiskCacheManagerTest {
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new 
ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
         diskCacheManager.appendEndOfSegmentEvent(
                 EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 
0);
 
+        diskCacheManager.close();
         assertThat(receivedBuffers).hasSize(1);
         List<PartitionFileWriter.SegmentBufferContext> segmentBufferContexts =
                 receivedBuffers.get(0).getSegmentBufferContexts();
@@ -131,6 +134,39 @@ class DiskCacheManagerTest {
         assertThat(event).isInstanceOf(EndOfSegmentEvent.class);
     }
 
+    @Test
+    void testFlushWhenCachedBytesReachLimit() throws IOException {
+        TestingTieredStorageMemoryManager memoryManager =
+                new TestingTieredStorageMemoryManager.Builder().build();
+
+        AtomicInteger numWriteTimes = new AtomicInteger(0);
+        TestingPartitionFileWriter partitionFileWriter =
+                new TestingPartitionFileWriter.Builder()
+                        .setWriteFunction(
+                                (partitionId, subpartitionBufferContexts) -> {
+                                    numWriteTimes.incrementAndGet();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+        DiskCacheManager diskCacheManager =
+                new DiskCacheManager(
+                        TieredStorageIdMappingUtils.convertId(new 
ResultPartitionID()),
+                        1,
+                        1024,
+                        memoryManager,
+                        partitionFileWriter);
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 
0);
+        assertThat(numWriteTimes).hasValue(0);
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0);
+        assertThat(numWriteTimes).hasValue(1);
+
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 
0);
+        assertThat(numWriteTimes).hasValue(1);
+        diskCacheManager.appendEndOfSegmentEvent(
+                EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 
0);
+        assertThat(numWriteTimes).hasValue(2);
+    }
+
     @Test
     void testRelease() {
         AtomicBoolean isReleased = new AtomicBoolean(false);
@@ -144,6 +180,7 @@ class DiskCacheManagerTest {
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new 
ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
         diskCacheManager.release();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 3daf34c4a97..58c09ff9df1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -212,6 +212,7 @@ public class DiskTierProducerAgentTest {
                 NUM_SUBPARTITIONS,
                 numBytesPerSegment,
                 BUFFER_SIZE_BYTES,
+                BUFFER_SIZE_BYTES,
                 dataFilePath,
                 minReservedDiskSpaceFraction,
                 isBroadcastOnly,

Reply via email to