This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 83558c7b0c97 refactor: Set merge memory for flink COW write path 
properly (#14010)
83558c7b0c97 is described below

commit 83558c7b0c975301a4ca21fc4883a31466ae2fbd
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Sep 30 13:27:43 2025 +0800

    refactor: Set merge memory for flink COW write path properly (#14010)
---
 .../org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java  |  8 ++++++--
 .../apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java | 12 ++----------
 2 files changed, 8 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 478cf0bee2b1..f5f491740eb0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -259,7 +259,7 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieWriteMerg
     Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema())
         .map(internalSchema -> 
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields, 
internalSchema,
             
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
-    long maxMemoryPerCompaction = 
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
+    long maxMemoryPerCompaction = getMaxMemoryForMerge();
     props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction));
     Option<Stream<HoodieLogFile>> logFilesStreamOpt = 
compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName ->
         new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
@@ -308,7 +308,11 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieWriteMerg
     }
   }
 
-  protected HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, 
Option<InternalSchema> internalSchemaOption, TypedProperties props,
+  protected long getMaxMemoryForMerge() {
+    return IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
+  }
+
+  private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, 
Option<InternalSchema> internalSchemaOption, TypedProperties props,
                                                         
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> 
incomingRecordsItr) {
     HoodieFileGroupReader.Builder<T> fileGroupBuilder = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
         
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
index 2ab001a92e11..d90fd2c219cc 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
@@ -18,18 +18,13 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
@@ -40,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.stream.Stream;
 
 /**
  * A {@link FileGroupReaderBasedMergeHandle} that supports MERGE write 
incrementally(small data buffers).
@@ -75,12 +69,10 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, 
O>
   }
 
   @Override
-  protected HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, 
Option<InternalSchema> internalSchemaOption, TypedProperties props,
-                                                        
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> 
incomingRecordsItr) {
+  protected long getMaxMemoryForMerge() {
     // the incoming records are already buffered on heap, and the underlying 
bytes are managed by memory pool
     // in Flink write buffer, so there is no need to spill.
-    props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(Long.MAX_VALUE));
-    return super.getFileGroupReader(usePosition, internalSchemaOption, props, 
logFileStreamOpt, incomingRecordsItr);
+    return Long.MAX_VALUE;
   }
 
   /**

Reply via email to