This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8b0a400 [HUDI-678] Make config package spark free (#1418) 8b0a400 is described below commit 8b0a4009a9c662f45a0b7bbbb6f01c2b07c36809 Author: leesf <490081...@qq.com> AuthorDate: Thu Mar 26 23:30:27 2020 +0800 [HUDI-678] Make config package spark free (#1418) --- .../hudi/client/AbstractHoodieWriteClient.java | 3 +- .../org/apache/hudi/client/HoodieWriteClient.java | 5 +- .../apache/hudi/client/utils/SparkConfigUtils.java | 94 ++++++++++++++++++++++ .../org/apache/hudi/config/HoodieMemoryConfig.java | 47 ----------- .../org/apache/hudi/config/HoodieWriteConfig.java | 27 +------ .../apache/hudi/index/bloom/HoodieBloomIndex.java | 3 +- .../org/apache/hudi/index/hbase/HBaseIndex.java | 3 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../compact/HoodieMergeOnReadTableCompactor.java | 6 +- 9 files changed, 110 insertions(+), 81 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 275e5d9..d1319b3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -21,6 +21,7 @@ package org.apache.hudi.client; import java.util.Collections; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -115,7 +116,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e String instantTime) { // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future // RDD actions that are performed after updating the index. - writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel()); + writeStatusRDD = writeStatusRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); Timer.Context indexTimer = metrics.getIndexCtx(); // Update the index back JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 7ccb2a4..5f269a8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; @@ -1077,7 +1078,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action - statuses.persist(config.getWriteStatusStorageLevel()); + statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); // pass extra-metada so that it gets stored in commit file automatically commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, Option.ofNullable(compactionPlan.getExtraMetadata())); @@ -1172,4 +1173,4 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo }); return compactionInstantTimeOpt; } -} \ No newline at end of file +} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java new file mode 100644 index 0000000..f6b8549 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.config.HoodieIndexConfig; + +import org.apache.spark.SparkEnv; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.util.Utils; + +import java.util.Properties; + +import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; +import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION; +import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE; +import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; +import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP; +import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL; + +/** + * Spark config utils. + */ +public class SparkConfigUtils { + + /** + * Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory * + * (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime + * the spark.executor.memory or the spark.memory.fraction is changed, the memory used for spillable map changes + * accordingly + */ + public static long getMaxMemoryAllowedForMerge(String maxMemoryFraction) { + final String SPARK_EXECUTOR_MEMORY_PROP = "spark.executor.memory"; + final String SPARK_EXECUTOR_MEMORY_FRACTION_PROP = "spark.memory.fraction"; + // This is hard-coded in spark code {@link + // https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/ + // spark/memory/UnifiedMemoryManager.scala#L231} so have to re-define this here + final String DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION = "0.6"; + // This is hard-coded in spark code {@link + // https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/ + // spark/SparkContext.scala#L471} so have to re-define this here + final String DEFAULT_SPARK_EXECUTOR_MEMORY_MB = "1024"; // in MB + if (SparkEnv.get() != null) { + // 1 GB is the default conf used by Spark, look at SparkContext.scala + long executorMemoryInBytes = Utils.memoryStringToMb( + SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024 * 1024L; + // 0.6 is the default value used by Spark, + // look at {@link + // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} + double memoryFraction = Double.parseDouble( + SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION)); + double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction); + double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); + long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); + return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); + } else { + return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; + } + } + + public static StorageLevel getWriteStatusStorageLevel(Properties properties) { + return StorageLevel.fromString(properties.getProperty(WRITE_STATUS_STORAGE_LEVEL)); + } + + public static StorageLevel getBloomIndexInputStorageLevel(Properties properties) { + return StorageLevel.fromString(properties.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL)); + } + + public static long getMaxMemoryPerPartitionMerge(Properties properties) { + String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE); + return getMaxMemoryAllowedForMerge(fraction); + } + + public static Long getMaxMemoryPerCompaction(Properties properties) { + String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION); + return getMaxMemoryAllowedForMerge(fraction); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java index 2aca01e..8f0f5e8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java @@ -18,9 +18,6 @@ package org.apache.hudi.config; -import org.apache.spark.SparkEnv; -import org.apache.spark.util.Utils; - import javax.annotation.concurrent.Immutable; import java.io.File; @@ -113,52 +110,8 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { return this; } - /** - * Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory * - * (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime - * the spark.executor.memory or the spark.memory.fraction is changed, the memory used for spillable map changes - * accordingly - */ - private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) { - final String SPARK_EXECUTOR_MEMORY_PROP = "spark.executor.memory"; - final String SPARK_EXECUTOR_MEMORY_FRACTION_PROP = "spark.memory.fraction"; - // This is hard-coded in spark code {@link - // https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/ - // spark/memory/UnifiedMemoryManager.scala#L231} so have to re-define this here - final String DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION = "0.6"; - // This is hard-coded in spark code {@link - // https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/ - // spark/SparkContext.scala#L471} so have to re-define this here - final String DEFAULT_SPARK_EXECUTOR_MEMORY_MB = "1024"; // in MB - - if (SparkEnv.get() != null) { - // 1 GB is the default conf used by Spark, look at SparkContext.scala - long executorMemoryInBytes = Utils.memoryStringToMb( - SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024 * 1024L; - // 0.6 is the default value used by Spark, - // look at {@link - // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} - double memoryFraction = Double.parseDouble( - SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION)); - double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction); - double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); - long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); - return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); - } else { - return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; - } - } - public HoodieMemoryConfig build() { HoodieMemoryConfig config = new HoodieMemoryConfig(props); - setDefaultOnCondition(props, !props.containsKey(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP), - MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION); - setDefaultOnCondition(props, !props.containsKey(MAX_MEMORY_FRACTION_FOR_MERGE_PROP), - MAX_MEMORY_FRACTION_FOR_MERGE_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE); - setDefaultOnCondition(props, !props.containsKey(MAX_MEMORY_FOR_MERGE_PROP), MAX_MEMORY_FOR_MERGE_PROP, - String.valueOf(getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP)))); - setDefaultOnCondition(props, !props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP), MAX_MEMORY_FOR_COMPACTION_PROP, - String.valueOf(getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP)))); setDefaultOnCondition(props, !props.containsKey(MAX_DFS_STREAM_BUFFER_SIZE_PROP), MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); setDefaultOnCondition(props, !props.containsKey(SPILLABLE_MAP_BASE_PATH_PROP), SPILLABLE_MAP_BASE_PATH_PROP, diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d323a3..e63601f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -30,7 +30,6 @@ import org.apache.hudi.table.compact.strategy.CompactionStrategy; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.spark.storage.StorageLevel; import javax.annotation.concurrent.Immutable; @@ -67,7 +66,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete"; private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true"; - private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; + public static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; @@ -183,10 +182,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP)); } - public StorageLevel getWriteStatusStorageLevel() { - return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL)); - } - public String getWriteStatusClassName() { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } @@ -430,10 +425,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET_PROP)); } - public StorageLevel getBloomIndexInputStorageLevel() { - return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL)); - } - public boolean getBloomIndexUpdatePartitionPath() { return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH)); } @@ -507,22 +498,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { /** * memory configs. */ - public Double getMaxMemoryFractionPerPartitionMerge() { - return Double.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP)); - } - - public Double getMaxMemoryFractionPerCompaction() { - return Double.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP)); - } - - public Long getMaxMemoryPerPartitionMerge() { - return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP)); - } - - public Long getMaxMemoryPerCompaction() { - return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP)); - } - public int getMaxDFSStreamBufferSize() { return Integer.parseInt(props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP)); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index b2166d0..35ac526 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -19,6 +19,7 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -69,7 +70,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex // Step 0: cache the input record RDD if (config.getBloomIndexUseCaching()) { - recordRDD.persist(config.getBloomIndexInputStorageLevel()); + recordRDD.persist(SparkConfigUtils.getBloomIndexInputStorageLevel(config.getProps())); } // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index bfd40ba..89ed64d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -19,6 +19,7 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -348,7 +349,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); // caching the index updated status RDD - writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); + writeStatusJavaRDD = writeStatusJavaRDD.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); return writeStatusJavaRDD; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 2affeca..3a81340 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -146,7 +147,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) { try { // Load the new records in a map - long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); + long memoryForMerge = SparkConfigUtils.getMaxMemoryPerPartitionMerge(config.getProps()); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java index 8f46b3a..4b09242 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.compact; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.utils.SparkConfigUtils; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; @@ -115,13 +116,14 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); - LOG.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); + LOG.info("MaxMemoryPerCompaction => " + SparkConfigUtils.getMaxMemoryPerCompaction(config.getProps())); List<String> logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) .collect(toList()); + long maxMemoryPerCompaction = SparkConfigUtils.getMaxMemoryPerCompaction(config.getProps()); HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, - readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), + readerSchema, maxInstantTime, maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) {