This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e95a9f56ae2703b566e9c21e58bacbd6379fa35e Author: ForwardXu <forwardxu...@gmail.com> AuthorDate: Wed Nov 9 13:07:55 2022 +0800 [HUDI-4526] Improve spillableMapBasePath when disk directory is full (#6284) (cherry picked from commit 371296173a7c51c325e6f9c3a3ef2ba5f6a89f6e) --- .../org/apache/hudi/config/HoodieMemoryConfig.java | 9 ++++-- .../table/log/HoodieMergedLogRecordScanner.java | 9 +++--- .../org/apache/hudi/common/util/FileIOUtils.java | 36 ++++++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java index 4e37796393..960ec61dc0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java @@ -22,9 +22,10 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; import javax.annotation.concurrent.Immutable; - import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig { public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty .key("hoodie.memory.spillable.map.path") .defaultValue("/tmp/") - .withDocumentation("Default file path prefix for spillable map"); + .withInferFunction(cfg -> { + String[] localDirs = FileIOUtils.getConfiguredLocalDirs(); + return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty(); + }) + .withDocumentation("Default file path for spillable map"); public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty .key("hoodie.memory.writestatus.failure.fraction") diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 5ef0a6821f..45975fbfde 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -18,6 +18,9 @@ package org.apache.hudi.common.table.log; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -34,12 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.internal.schema.InternalSchema; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -95,6 +93,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled); + this.maxMemorySizeInBytes = maxMemorySizeInBytes; } catch (IOException e) { throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index 6a9e2e1b35..426a703503 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -204,4 +204,40 @@ public class FileIOUtils { public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) { return readDataFromPath(fileSystem, detailPath, false); } + + /** + * Return the configured local directories where hudi can write files. This + * method does not create any directories on its own, it only encapsulates the + * logic of locating the local directories according to deployment mode. + */ + public static String[] getConfiguredLocalDirs() { + if (isRunningInYarnContainer()) { + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. Note this assumes that Yarn has + // created the directories already, and that they are secured so that only the + // user has access to them. + return getYarnLocalDirs().split(","); + } else if (System.getProperty("java.io.tmpdir") != null) { + return System.getProperty("java.io.tmpdir").split(","); + } else { + return null; + } + } + + private static boolean isRunningInYarnContainer() { + // These environment variables are set by YARN. + return System.getenv("CONTAINER_ID") != null; + } + + /** + * Get the Yarn approved local directories. + */ + private static String getYarnLocalDirs() { + String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse(""); + + if (localDirs.isEmpty()) { + throw new HoodieIOException("Yarn Local dirs can't be empty"); + } + return localDirs; + } }