This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5579715ef7d9 fix(flink): reuse the preceeding avg size if there is no 
eligible estimation (#19022)
5579715ef7d9 is described below

commit 5579715ef7d926fc68c9b0dc1918ff835c798e89
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 17 11:25:41 2026 +0800

    fix(flink): reuse the preceeding avg size if there is no eligible 
estimation (#19022)
    
    * fix(flink): reuse the preceeding avg size if there is no eligible 
estimation
---
 .../partitioner/profile/DeltaWriteProfile.java     |  6 +-
 .../sink/partitioner/profile/WriteProfile.java     |  8 +-
 .../hudi/sink/partitioner/TestBucketAssigner.java  | 85 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 9 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index a79c78e6d363..2cdee3453a46 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -29,8 +29,6 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.action.commit.SmallFile;
 
-import lombok.extern.slf4j.Slf4j;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -41,7 +39,6 @@ import java.util.stream.Collectors;
  *
  * <p>Note: assumes the index can always index log files for Flink write.
  */
-@Slf4j
 public class DeltaWriteProfile extends WriteProfile {
 
   public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext 
context) {
@@ -93,7 +90,7 @@ public class DeltaWriteProfile extends WriteProfile {
 
   @Override
   protected long averageBytesPerRecord() {
-    long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+    long avgSize = this.avgSize > 0 ? this.avgSize : 
config.getCopyOnWriteRecordSizeEstimate();
     HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
     if (!commitTimeline.empty()) {
       long sizeFromCommitMetadata = 
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -109,7 +106,6 @@ public class DeltaWriteProfile extends WriteProfile {
         }
       }
     }
-    log.info("Refresh average bytes per record => " + avgSize);
     return avgSize;
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index c3819bf08116..304d8f5d7452 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -78,7 +78,7 @@ public class WriteProfile {
    * The average record size.
    */
   @Getter
-  private long avgSize = -1L;
+  protected long avgSize = -1L;
 
   /**
    * Total records to write for each bucket based on
@@ -134,7 +134,7 @@ public class WriteProfile {
    * records pack into one file.
    */
   protected long averageBytesPerRecord() {
-    long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+    long avgSize = this.avgSize > 0 ? this.avgSize : 
config.getCopyOnWriteRecordSizeEstimate();
     HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
     if (!commitTimeline.empty()) {
       long sizeFromCommitMetadata = 
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -142,7 +142,6 @@ public class WriteProfile {
         avgSize = sizeFromCommitMetadata;
       }
     }
-    log.info("Refresh average bytes per record => " + avgSize);
     return avgSize;
   }
 
@@ -238,8 +237,9 @@ public class WriteProfile {
 
   private void recordProfile() {
     this.avgSize = averageBytesPerRecord();
+    log.info("Refresh average bytes per record => {}", avgSize);
     this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
-    log.info("Refresh insert records per bucket => " + recordsPerBucket);
+    log.info("Refresh insert records per bucket => {}", recordsPerBucket);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 8281ebbd86e4..3b5c265f7c91 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -46,11 +47,13 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -444,6 +447,26 @@ public class TestBucketAssigner {
         writeProfile.getRecordsPerBucket(), 
is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
   }
 
+  @Test
+  public void 
testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReload() throws 
Exception {
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
"1024");
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+    setScriptedRecordSizes(512L, -1L);
+    WriteProfile writeProfile = new 
ScriptedRecordSizeWriteProfile(writeConfig, context);
+    assertThat("Average record size should use the profiled commit metadata",
+        writeProfile.getAvgSize(), is(512L));
+
+    writeProfile.reload(1);
+
+    assertThat("Average record size should reuse the previous estimate when no 
eligible commit metadata is found",
+        writeProfile.getAvgSize(), is(512L));
+    assertThat("Records per bucket should continue to use the previous 
estimate",
+        writeProfile.getRecordsPerBucket(), 
is(writeConfig.getParquetMaxFileSize() / 512L));
+  }
+
   @Test
   public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() 
throws Exception {
     File morPath = new File(tempFile, "mor");
@@ -505,6 +528,34 @@ public class TestBucketAssigner {
         writeProfile.getRecordsPerBucket(), 
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
   }
 
+  @Test
+  public void 
testDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReload() 
throws Exception {
+    File morPath = new File(tempFile, "mor_reuse_previous_avg");
+    Configuration morConf = 
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+    morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
 "1024");
+    StreamerUtil.initTableIfNotExists(morConf);
+    TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+    HoodieWriteConfig morWriteConfig = 
FlinkWriteClients.getHoodieClientConfig(morConf);
+    HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+        
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+        new FlinkTaskContextSupplier(null));
+
+    setScriptedRecordSizes(256L, -1L);
+    DeltaWriteProfile writeProfile = new 
ScriptedRecordSizeDeltaWriteProfile(morWriteConfig, morContext);
+    assertThat("Average record size should use the profiled delta commit 
metadata",
+        writeProfile.getAvgSize(), is(256L));
+
+    writeProfile.reload(1);
+
+    assertThat("Average record size should reuse the previous estimate when no 
eligible delta commit metadata is found",
+        writeProfile.getAvgSize(), is(256L));
+    assertThat("Records per bucket should continue to use the previous 
estimate",
+        writeProfile.getRecordsPerBucket(), 
is(morWriteConfig.getParquetMaxFileSize() / 256L));
+  }
+
   private static String getLastCompleteInstant(WriteProfile profile) {
     return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
   }
@@ -526,6 +577,40 @@ public class TestBucketAssigner {
     assertThat(bucketInfo.getBucketType(), is(bucketType));
   }
 
+  private static Queue<Long> scriptedRecordSizes = new ArrayDeque<>();
+
+  private static void setScriptedRecordSizes(Long... recordSizes) {
+    scriptedRecordSizes = new ArrayDeque<>(Arrays.asList(recordSizes));
+  }
+
+  /**
+   * WriteProfile with scripted record size estimates.
+   */
+  static class ScriptedRecordSizeWriteProfile extends WriteProfile {
+    ScriptedRecordSizeWriteProfile(HoodieWriteConfig config, 
HoodieFlinkEngineContext context) {
+      super(config, context);
+    }
+
+    @Override
+    protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline 
commitTimeline, double fileSizeCalibrationRatio) {
+      return scriptedRecordSizes.remove();
+    }
+  }
+
+  /**
+   * DeltaWriteProfile with scripted record size estimates.
+   */
+  static class ScriptedRecordSizeDeltaWriteProfile extends DeltaWriteProfile {
+    ScriptedRecordSizeDeltaWriteProfile(HoodieWriteConfig config, 
HoodieFlinkEngineContext context) {
+      super(config, context);
+    }
+
+    @Override
+    protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline 
commitTimeline, double fileSizeCalibrationRatio) {
+      return scriptedRecordSizes.remove();
+    }
+  }
+
   /**
    * Mock BucketAssigner that can specify small files explicitly.
    */

Reply via email to