This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 83558c7b0c97 refactor: Set merge memory for flink COW write path
properly (#14010)
83558c7b0c97 is described below
commit 83558c7b0c975301a4ca21fc4883a31466ae2fbd
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Sep 30 13:27:43 2025 +0800
refactor: Set merge memory for flink COW write path properly (#14010)
---
.../org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java | 8 ++++++--
.../apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java | 12 ++----------
2 files changed, 8 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 478cf0bee2b1..f5f491740eb0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -259,7 +259,7 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMerg
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema())
.map(internalSchema ->
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields,
internalSchema,
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
- long maxMemoryPerCompaction =
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
+ long maxMemoryPerCompaction = getMaxMemoryForMerge();
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(maxMemoryPerCompaction));
Option<Stream<HoodieLogFile>> logFilesStreamOpt =
compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName ->
new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
@@ -308,7 +308,11 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMerg
}
}
- protected HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition,
Option<InternalSchema> internalSchemaOption, TypedProperties props,
+ protected long getMaxMemoryForMerge() {
+ return IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
+ }
+
+ private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition,
Option<InternalSchema> internalSchemaOption, TypedProperties props,
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>>
incomingRecordsItr) {
HoodieFileGroupReader.Builder<T> fileGroupBuilder =
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
index 2ab001a92e11..d90fd2c219cc 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
@@ -18,18 +18,13 @@
package org.apache.hudi.io;
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
@@ -40,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
-import java.util.stream.Stream;
/**
* A {@link FileGroupReaderBasedMergeHandle} that supports MERGE write
incrementally(small data buffers).
@@ -75,12 +69,10 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K,
O>
}
@Override
- protected HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition,
Option<InternalSchema> internalSchemaOption, TypedProperties props,
-
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>>
incomingRecordsItr) {
+ protected long getMaxMemoryForMerge() {
// the incoming records are already buffered on heap, and the underlying
bytes are managed by memory pool
// in Flink write buffer, so there is no need to spill.
- props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(Long.MAX_VALUE));
- return super.getFileGroupReader(usePosition, internalSchemaOption, props,
logFileStreamOpt, incomingRecordsItr);
+ return Long.MAX_VALUE;
}
/**