This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d8d006a7555 [HUDI-6874] Move configs for reading a file group to
hudi-common module (#9746)
d8d006a7555 is described below
commit d8d006a755561c09a58ecb143f62a9c2fcacf098
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Sep 21 07:56:56 2023 -0700
[HUDI-6874] Move configs for reading a file group to hudi-common module
(#9746)
This commit moves log merging and memory-related configs to `hudi-common`
module so the common file group read can use them. Configs in
`HoodieRealtimeConfig` are removed if duplicate or merged to existing config.
Two config names are removed and remove for simplification:
- `compaction.memory.fraction` -> `hoodie.memory.compaction.fraction`
- `compaction.lazy.block.read.enabled` ->
`hoodie.compaction.lazy.block.read`
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 10 +--
.../cli/commands/TestHoodieLogFileCommand.java | 10 +--
.../apache/hudi/config/HoodieCompactionConfig.java | 96 ++++++++++++----------
.../org/apache/hudi/config/HoodieWriteConfig.java | 8 +-
.../src/main/java/org/apache/hudi/io/IOUtils.java | 12 +--
.../GenericRecordValidationTestUtils.java | 4 +-
.../MultipleSparkJobExecutionStrategy.java | 8 +-
.../java/org/apache/hudi/io/TestSparkIOUtils.java | 6 +-
.../table/action/compact/TestHoodieCompactor.java | 2 +-
.../apache/hudi/common/config/ConfigGroups.java | 1 +
.../hudi/common}/config/HoodieMemoryConfig.java | 28 ++++---
.../hudi/common/config/HoodieReaderConfig.java | 54 ++++++++++++
.../org/apache/hudi/common/util/ConfigUtils.java | 44 ++++++++++
.../apache/hudi/common/util/TestConfigUtils.java | 42 ++++++++++
.../hudi/sink/compact/FlinkCompactionConfig.java | 2 +-
.../org/apache/hudi/table/format/FormatUtils.java | 56 +++++++++++--
.../org/apache/hudi/util/FlinkWriteClients.java | 2 +-
.../apache/hudi/table/format/TestFormatUtils.java | 67 +++++++++++++++
.../hudi/hadoop/config/HoodieRealtimeConfig.java | 46 -----------
.../realtime/HoodieMergeOnReadSnapshotReader.java | 29 ++++---
.../realtime/RealtimeCompactedRecordReader.java | 18 ++--
.../realtime/RealtimeUnmergedRecordReader.java | 23 ++++--
.../utils/HoodieRealtimeRecordReaderUtils.java | 9 +-
.../TestHoodieMergeOnReadSnapshotReader.java | 4 +-
.../realtime/TestHoodieRealtimeRecordReader.java | 4 +-
.../reader/DFSHoodieDatasetInputReader.java | 6 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 24 +++---
.../ShowHoodieLogFileRecordsProcedure.scala | 10 +--
.../apache/hudi/functional/TestMORDataSource.scala | 10 ++-
29 files changed, 435 insertions(+), 200 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 58eff5f7b31..07291ab8609 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -23,6 +23,8 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -41,8 +43,6 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.hadoop.CachingPath;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -240,17 +240,17 @@ public class HoodieLogFileCommand {
.getCommitTimeline().lastInstant().get().getTimestamp())
.withReadBlocksLazily(
Boolean.parseBoolean(
-
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
+
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
.withReverseReader(
Boolean.parseBoolean(
-
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
+
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
for (HoodieRecord hoodieRecord : scanner) {
Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 0f796c8195a..12de150f2d3 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -27,6 +27,8 @@ import
org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -42,8 +44,6 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -242,15 +242,15 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withReadBlocksLazily(
Boolean.parseBoolean(
-
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
+
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
.withReverseReader(
Boolean.parseBoolean(
-
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
+
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
Iterator<HoodieRecord> records = scanner.iterator();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 19e2678c8ae..e77308f344b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -34,6 +34,8 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+
/**
* Compaction related config.
*/
@@ -146,21 +148,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "compaction during each compaction run. By default. Hudi picks the
log file "
+ "with most accumulated unmerged data");
- public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE
= ConfigProperty
- .key("hoodie.compaction.lazy.block.read")
- .defaultValue("true")
- .markAdvanced()
- .withDocumentation("When merging the delta log files, this config helps
to choose whether the log blocks "
- + "should be read lazily or not. Choose true to use lazy block
reading (low memory usage, but incurs seeks to each block"
- + " header) or false for immediate block read (higher memory
usage)");
-
- public static final ConfigProperty<String>
COMPACTION_REVERSE_LOG_READ_ENABLE = ConfigProperty
- .key("hoodie.compaction.reverse.log.read")
- .defaultValue("false")
- .markAdvanced()
- .withDocumentation("HoodieLogFormatReader reads a logfile in the forward
direction starting from pos=0 to pos=file_length. "
- + "If this config is set to true, the reader reads the logfile in
reverse direction, from pos=file_length to pos=0");
-
public static final ConfigProperty<String>
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
.key("hoodie.compaction.daybased.target.partitions")
.defaultValue("10")
@@ -204,24 +191,24 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Log compaction can be scheduled if the no. of log
blocks crosses this threshold value. "
+ "This is effective only when log compaction is enabled via " +
INLINE_LOG_COMPACT.key());
- public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN
= ConfigProperty
- .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
- .defaultValue("false")
- .markAdvanced()
- .sinceVersion("0.13.0")
- .withDocumentation("New optimized scan for log blocks that handles all
multi-writer use-cases while appending to log files. "
- + "It also differentiates original blocks written by ingestion
writers and compacted blocks written log compaction.");
-
- /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT} and its methods instead
+ */
@Deprecated
public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
- /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its
methods instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods
instead
+ */
@Deprecated
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP =
INLINE_COMPACT_NUM_DELTA_COMMITS.key();
- /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its
methods instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its
methods instead
+ */
@Deprecated
public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP =
INLINE_COMPACT_TIME_DELTA_SECONDS.key();
- /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods
instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods
instead
+ */
@Deprecated
public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP =
INLINE_COMPACT_TRIGGER_STRATEGY.key();
/**
@@ -289,39 +276,59 @@ public class HoodieCompactionConfig extends HoodieConfig {
*/
@Deprecated
public static final String COMPACTION_STRATEGY_PROP =
COMPACTION_STRATEGY.key();
- /** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */
+ /**
+ * @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead
+ */
@Deprecated
public static final String DEFAULT_COMPACTION_STRATEGY =
COMPACTION_STRATEGY.defaultValue();
- /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its
methods instead */
+ /**
+ * @deprecated Use {@link
HoodieReaderConfig#COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead
+ */
@Deprecated
- public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP =
COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
- /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its
methods instead */
+ public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP =
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
+ /**
+ * @deprecated Use {@link
HoodieReaderConfig#COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead
+ */
@Deprecated
- public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED =
COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
- /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its
methods instead */
+ public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED =
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue();
+ /**
+ * @deprecated Use {@link
HoodieReaderConfig#COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead
+ */
@Deprecated
- public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP =
COMPACTION_REVERSE_LOG_READ_ENABLE.key();
- /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its
methods instead */
+ public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP =
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.key();
+ /**
+ * @deprecated Use {@link
HoodieReaderConfig#COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead
+ */
@Deprecated
- public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED =
COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
+ public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED =
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
/**
* @deprecated Use {@link #INLINE_COMPACT} and its methods instead
*/
@Deprecated
private static final String DEFAULT_INLINE_COMPACT =
INLINE_COMPACT.defaultValue();
- /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its
methods instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods
instead
+ */
@Deprecated
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS =
INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue();
- /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its
methods instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its
methods instead
+ */
@Deprecated
private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS =
INLINE_COMPACT_TIME_DELTA_SECONDS.defaultValue();
- /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods
instead */
+ /**
+ * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods
instead
+ */
@Deprecated
private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY =
INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue();
- /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and
its methods instead */
+ /**
+ * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and
its methods instead
+ */
@Deprecated
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP =
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key();
- /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and
its methods instead */
+ /**
+ * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and
its methods instead
+ */
@Deprecated
public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION
= TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue();
@@ -415,12 +422,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
}
public Builder withCompactionLazyBlockReadEnabled(Boolean
compactionLazyBlockReadEnabled) {
- compactionConfig.setValue(COMPACTION_LAZY_BLOCK_READ_ENABLE,
String.valueOf(compactionLazyBlockReadEnabled));
+
compactionConfig.setValue(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE,
String.valueOf(compactionLazyBlockReadEnabled));
return this;
}
public Builder withCompactionReverseLogReadEnabled(Boolean
compactionReverseLogReadEnabled) {
- compactionConfig.setValue(COMPACTION_REVERSE_LOG_READ_ENABLE,
String.valueOf(compactionReverseLogReadEnabled));
+
compactionConfig.setValue(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE,
String.valueOf(compactionReverseLogReadEnabled));
return this;
}
@@ -456,6 +463,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
public HoodieCompactionConfig build() {
compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
+ compactionConfig.setDefaults(HoodieReaderConfig.class.getName());
return compactionConfig;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2b88699f3f4..10e6967e204 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -27,8 +27,10 @@ import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieMetaserverConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
import org.apache.hudi.common.config.TypedProperties;
@@ -1458,7 +1460,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public boolean enableOptimizedLogBlocksScan() {
- return getBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
+ return getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}
public HoodieCleaningPolicy getCleanerPolicy() {
@@ -1590,11 +1592,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public Boolean getCompactionLazyBlockReadEnabled() {
- return
getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
+ return getBoolean(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
}
public Boolean getCompactionReverseLogReadEnabled() {
- return
getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
+ return getBoolean(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
}
public int getArchiveDeleteParallelism() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index 7a08305d2df..5db71ee67a7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -23,12 +23,12 @@ import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.util.Option;
-import static
org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
-import static
org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
-import static
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
-import static
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
public class IOUtils {
/**
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 2196b6f0b63..e542fb481b1 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -19,11 +19,11 @@
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.Schema;
@@ -94,7 +94,7 @@ public class GenericRecordValidationTestUtils {
.collect(Collectors.toList());
jobConf.set(String.format(HOODIE_CONSUME_COMMIT, config.getTableName()),
instant1);
- jobConf.set(HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, "true");
+ jobConf.set(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
"true");
List<GenericRecord> records =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
hadoopConf, fullPartitionPaths, config.getBasePath(), jobConf, true);
Map<String, GenericRecord> prevRecordsMap = records.stream()
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index c6a1df9105e..ccdad98fc8e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -431,9 +432,10 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
Path[] paths;
if (hasLogFiles) {
- String compactionFractor =
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
- .orElse("0.75");
- params.put("compaction.memory.fraction", compactionFractor);
+ String rawFractionConfig =
getWriteConfig().getString(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION);
+ String compactionFractor = rawFractionConfig != null
+ ? rawFractionConfig :
HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION;
+ params.put(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(),
compactionFractor);
Path[] deltaPaths = clusteringOps
.stream()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
index 7490a4d337a..0a97e60d43c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
@@ -19,14 +19,14 @@
package org.apache.hudi.io;
import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import static
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
-import static
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestSparkIOUtils {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 3fd09d5704f..fb4924f97f8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index c79d3711c5a..daba6f9203e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -34,6 +34,7 @@ public class ConfigGroups {
SPARK_DATASOURCE("Spark Datasource Configs"),
FLINK_SQL("Flink Sql Configs"),
WRITE_CLIENT("Write Client Configs"),
+ READER("Reader Configs"),
META_SYNC("Metastore and Catalog Sync Configs"),
METRICS("Metrics Configs"),
RECORD_PAYLOAD("Record Payload Config"),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
similarity index 91%
rename from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
index 175228a3ced..34e06abec63 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
@@ -7,22 +7,18 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.config;
+package org.apache.hudi.common.config;
-import org.apache.hudi.common.config.ConfigClassProperty;
-import org.apache.hudi.common.config.ConfigGroups;
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.FileIOUtils;
import javax.annotation.concurrent.Immutable;
@@ -32,7 +28,6 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
-
/**
* Memory related config.
*/
@@ -62,6 +57,9 @@ public class HoodieMemoryConfig extends HoodieConfig {
+ "OOM in the Scanner. Hence, a spillable map helps alleviate the
memory pressure. Use this config to "
+ "set the max allowable inMemory footprint of the spillable map");
+ // Maximum fraction of mapper/reducer task memory to use for compaction of
log files in MR
+ public static final String DEFAULT_MR_COMPACTION_MEMORY_FRACTION = "0.75";
+
// Default memory size (1GB) per compaction (used if SparkEnv is absent),
excess spills to disk
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES =
HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
// Minimum memory size (100MB) for the spillable map.
@@ -77,6 +75,10 @@ public class HoodieMemoryConfig extends HoodieConfig {
public static final ConfigProperty<Integer> MAX_DFS_STREAM_BUFFER_SIZE =
HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
+ // Default value for MR
+ // Setting this to lower value of 1 MB since no control over how many
RecordReaders will be started in a mapper
+ public static final int DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024;
// 1 MB
+
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH =
ConfigProperty
.key("hoodie.memory.spillable.map.path")
.noDefaultValue()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
new file mode 100644
index 00000000000..d25d1898ff2
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configurations for reading a file group
+ */
+@Immutable
+@ConfigClassProperty(name = "Reader Configs",
+ groupName = ConfigGroups.Names.READER,
+ description = "Configurations that control file group reading.")
+public class HoodieReaderConfig {
+ public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE
= ConfigProperty
+ .key("hoodie.compaction.lazy.block.read")
+ .defaultValue("true")
+ .markAdvanced()
+ .withDocumentation("When merging the delta log files, this config helps
to choose whether the log blocks "
+ + "should be read lazily or not. Choose true to use lazy block
reading (low memory usage, but incurs seeks to each block"
+ + " header) or false for immediate block read (higher memory
usage)");
+
+ public static final ConfigProperty<String>
COMPACTION_REVERSE_LOG_READ_ENABLE = ConfigProperty
+ .key("hoodie.compaction.reverse.log.read")
+ .defaultValue("false")
+ .markAdvanced()
+ .withDocumentation("HoodieLogFormatReader reads a logfile in the forward
direction starting from pos=0 to pos=file_length. "
+ + "If this config is set to true, the reader reads the logfile in
reverse direction, from pos=file_length to pos=0");
+
+ public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN
= ConfigProperty
+ .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
+ .defaultValue("false")
+ .markAdvanced()
+ .sinceVersion("0.13.0")
+ .withDocumentation("New optimized scan for log blocks that handles all
multi-writer use-cases while appending to log files. "
+ + "It also differentiates original blocks written by ingestion
writers and compacted blocks written log compaction.");
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 0b95bbabcb3..9e0655d6734 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -290,6 +290,32 @@ public class ConfigUtils {
return Option.empty();
}
+ /**
+ * Gets the raw value for a {@link ConfigProperty} config from Hadoop
configuration. The key and
+ * alternative keys are used to fetch the config.
+ *
+ * @param conf Configs in Hadoop {@link Configuration}.
+ * @param configProperty {@link ConfigProperty} config to fetch.
+ * @return {@link Option} of value if the config exists; empty {@link
Option} otherwise.
+ */
+ public static Option<String> getRawValueWithAltKeys(Configuration conf,
+ ConfigProperty<?>
configProperty) {
+ String value = conf.get(configProperty.key());
+ if (value != null) {
+ return Option.of(value);
+ }
+ for (String alternative : configProperty.getAlternatives()) {
+ String altValue = conf.get(alternative);
+ if (altValue != null) {
+ LOG.warn(String.format("The configuration key '%s' has been deprecated
"
+ + "and may be removed in the future. Please use the new key
'%s' instead.",
+ alternative, configProperty.key()));
+ return Option.of(altValue);
+ }
+ }
+ return Option.empty();
+ }
+
/**
* Gets the String value for a {@link ConfigProperty} config from
properties. The key and
* alternative keys are used to fetch the config. If the config is not
found, an
@@ -416,6 +442,24 @@ public class ConfigUtils {
return rawValue.map(v ->
Boolean.parseBoolean(v.toString())).orElse(defaultValue);
}
+ /**
+ * Gets the boolean value for a {@link ConfigProperty} config from Hadoop
configuration. The key and
+ * alternative keys are used to fetch the config. The default value of
{@link ConfigProperty}
+ * config, if exists, is returned if the config is not found in the
configuration.
+ *
+ * @param conf Configs in Hadoop {@link Configuration}.
+ * @param configProperty {@link ConfigProperty} config to fetch.
+ * @return boolean value if the config exists; default boolean value if the
config does not exist
+ * and there is default value defined in the {@link ConfigProperty} config;
{@code false} otherwise.
+ */
+ public static boolean getBooleanWithAltKeys(Configuration conf,
+ ConfigProperty<?>
configProperty) {
+ Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
+ boolean defaultValue = configProperty.hasDefaultValue()
+ ? Boolean.parseBoolean(configProperty.defaultValue().toString()) :
false;
+ return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
+ }
+
/**
* Gets the integer value for a {@link ConfigProperty} config from
properties. The key and
* alternative keys are used to fetch the config. The default value of
{@link ConfigProperty}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 1f959ba1b58..ff9ce5c73c6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -19,6 +19,9 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@@ -28,6 +31,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestConfigUtils {
+ public static final ConfigProperty<String> TEST_BOOLEAN_CONFIG_PROPERTY =
ConfigProperty
+ .key("hoodie.test.boolean.config")
+ .defaultValue("true")
+ .withAlternatives("hudi.test.boolean.config")
+ .markAdvanced()
+ .withDocumentation("Testing boolean config.");
@Test
public void testToMapSucceeds() {
@@ -67,4 +76,37 @@ public class TestConfigUtils {
String srcKv = "k.1.1.2=v1=v1.1\nk.2.1.2=v2\nk.3.1.2=v3";
assertThrows(IllegalArgumentException.class, () ->
ConfigUtils.toMap(srcKv));
}
+
+ @Test
+ public void testGetRawValueWithAltKeysFromHadoopConf() {
+ Configuration conf = new Configuration();
+ assertEquals(Option.empty(), ConfigUtils.getRawValueWithAltKeys(conf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ boolean setValue =
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+ conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+ assertEquals(Option.of(String.valueOf(setValue)),
+ ConfigUtils.getRawValueWithAltKeys(conf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ conf = new Configuration();
+ conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0),
setValue);
+ assertEquals(Option.of(String.valueOf(setValue)),
+ ConfigUtils.getRawValueWithAltKeys(conf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+ }
+
+ @Test
+ public void testGetBooleanWithAltKeysFromHadoopConf() {
+ Configuration conf = new Configuration();
+
assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()),
+ ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ boolean setValue =
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+ conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+ assertEquals(setValue,
+ ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ conf = new Configuration();
+ conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0),
setValue);
+ assertEquals(setValue,
+ ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+ }
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index e783fd9cc8f..813a293e2c4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -18,10 +18,10 @@
package org.apache.hudi.sink.compact;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index f408ae316eb..84f50593e9e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -18,7 +18,9 @@
package org.apache.hudi.table.format;
-import java.util.stream.Collectors;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
@@ -32,12 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.FlinkWriteClients;
@@ -59,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Utilities for format.
@@ -202,13 +204,12 @@ public class FormatUtils {
.withInternalSchema(internalSchema)
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
- string2Boolean(
-
flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ getBooleanWithAltKeys(flinkConf,
+ HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
.withReverseReader(false)
.withBufferSize(
-
flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
.withInstantRange(split.getInstantRange())
.withRecordMerger(merger);
@@ -276,6 +277,45 @@ public class FormatUtils {
.build();
}
+ /**
+ * Gets the raw value for a {@link ConfigProperty} config from Flink
configuration. The key and
+ * alternative keys are used to fetch the config.
+ *
+ * @param flinkConf Configs in Flink {@link
org.apache.flink.configuration.Configuration}.
+ * @param configProperty {@link ConfigProperty} config to fetch.
+ * @return {@link Option} of value if the config exists; empty {@link
Option} otherwise.
+ */
+ public static Option<String>
getRawValueWithAltKeys(org.apache.flink.configuration.Configuration flinkConf,
+ ConfigProperty<?>
configProperty) {
+ if (flinkConf.containsKey(configProperty.key())) {
+ return Option.ofNullable(flinkConf.getString(configProperty.key(), ""));
+ }
+ for (String alternative : configProperty.getAlternatives()) {
+ if (flinkConf.containsKey(alternative)) {
+ return Option.ofNullable(flinkConf.getString(alternative, ""));
+ }
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Gets the boolean value for a {@link ConfigProperty} config from Flink
configuration. The key and
+ * alternative keys are used to fetch the config. The default value of
{@link ConfigProperty}
+ * config, if exists, is returned if the config is not found in the
configuration.
+ *
+ * @param conf Configs in Flink {@link Configuration}.
+ * @param configProperty {@link ConfigProperty} config to fetch.
+ * @return boolean value if the config exists; default boolean value if the
config does not exist
+ * and there is default value defined in the {@link ConfigProperty} config;
{@code false} otherwise.
+ */
+ public static boolean
getBooleanWithAltKeys(org.apache.flink.configuration.Configuration conf,
+ ConfigProperty<?>
configProperty) {
+ Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
+ boolean defaultValue = configProperty.hasDefaultValue()
+ ? Boolean.parseBoolean(configProperty.defaultValue().toString()) :
false;
+ return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
+ }
+
private static Boolean string2Boolean(String s) {
return "true".equals(s.toLowerCase(Locale.ROOT));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 6d8b0d0a7d6..49619321d01 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
@@ -34,7 +35,6 @@ import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
new file mode 100644
index 00000000000..711c694614b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.hudi.common.util.TestConfigUtils.TEST_BOOLEAN_CONFIG_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link FormatUtils}
+ */
+public class TestFormatUtils {
+ @Test
+ public void testGetRawValueWithAltKeys() {
+ Configuration flinkConf = new Configuration();
+ assertEquals(Option.empty(),
+ FormatUtils.getRawValueWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ boolean setValue =
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+ flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+ assertEquals(Option.of(String.valueOf(setValue)),
+ FormatUtils.getRawValueWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ flinkConf = new Configuration();
+
flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0),
setValue);
+ assertEquals(Option.of(String.valueOf(setValue)),
+ FormatUtils.getRawValueWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+ }
+
+ @Test
+ public void testGetBooleanWithAltKeys() {
+ Configuration flinkConf = new Configuration();
+
assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()),
+ FormatUtils.getBooleanWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ boolean setValue =
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+ flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+ assertEquals(setValue,
+ FormatUtils.getBooleanWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+ flinkConf = new Configuration();
+
flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0),
setValue);
+ assertEquals(setValue,
+ FormatUtils.getBooleanWithAltKeys(flinkConf,
TEST_BOOLEAN_CONFIG_PROPERTY));
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
deleted file mode 100644
index a9688fed5b8..00000000000
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.config;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-
-/**
- * Class to hold props related to Hoodie RealtimeInputFormat and
RealtimeRecordReader.
- */
-public final class HoodieRealtimeConfig {
-
- // Fraction of mapper/reducer task memory used for compaction of log files
- public static final String COMPACTION_MEMORY_FRACTION_PROP =
"compaction.memory.fraction";
- public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
- // used to choose a trade off between IO vs Memory when performing
compaction process
- // Depending on outputfile size and memory provided, choose true to avoid
OOM for large file
- // size + small memory
- public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP =
"compaction.lazy.block.read.enabled";
- public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED =
"true";
-
- // Property to set the max memory for dfs inputstream buffer size
- public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP =
"hoodie.memory.dfs.buffer.max.size";
- // Setting this to lower value of 1 MB since no control over how many
RecordReaders will be started in a mapper
- public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; //
1 MB
- // Property to set file path prefix for spillable file
- public static final String SPILLABLE_MAP_BASE_PATH_PROP =
"hoodie.memory.spillable.map.path";
- // Default file path prefix for spillable file
- public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
- public static final String ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN =
- "hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN;
-}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
index 1cc8bf91b25..a4dc9b0cc67 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -48,13 +50,11 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP;
-import static
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
import static
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
@@ -117,7 +117,8 @@ public class HoodieMergeOnReadSnapshotReader extends
AbstractRealtimeRecordReade
Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
this.mergedRecordsByKey = new ExternalSpillableMap<>(
getMaxCompactionMemoryInBytes(jobConf),
- jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH),
+ jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
+ FileIOUtils.getDefaultSpillableMapBasePath()),
new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema),
jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
@@ -185,13 +186,17 @@ public class HoodieMergeOnReadSnapshotReader extends
AbstractRealtimeRecordReade
.withReaderSchema(readerSchema)
.withLatestInstantTime(latestInstantTime)
.withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
-
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReadBlocksLazily(
+ ConfigUtils.getBooleanWithAltKeys(jobConf,
COMPACTION_LAZY_BLOCK_READ_ENABLE))
.withReverseReader(false)
- .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP,
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
+ FileIOUtils.getDefaultSpillableMapBasePath()))
.withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
false))
+
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
+
Boolean.parseBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
.build();
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 2a271203d77..ba9c2b9a5a7 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -20,15 +20,18 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -90,14 +93,19 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
.withReaderSchema(getLogScannerReaderSchema())
.withLatestInstantTime(split.getMaxCommitTime())
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
-
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReadBlocksLazily(
+ ConfigUtils.getBooleanWithAltKeys(jobConf,
+ HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
.withReverseReader(false)
-
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
.withBufferSize(jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+
.withSpillableMapBasePath(jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+ FileIOUtils.getDefaultSpillableMapBasePath()))
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
false))
+
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
+
Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
.build();
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index a40519df92d..82283807c1d 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -18,8 +18,11 @@
package org.apache.hudi.hadoop.realtime;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
@@ -29,7 +32,6 @@ import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.generic.GenericRecord;
@@ -76,14 +78,17 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader
HoodieUnMergedLogRecordScanner.Builder scannerBuilder =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withFileSystem(FSUtils.getFs(split.getPath().toString(),
this.jobConf))
- .withBasePath(split.getBasePath())
- .withLogFilePaths(split.getDeltaLogPaths())
- .withReaderSchema(getReaderSchema())
- .withLatestInstantTime(split.getMaxCommitTime())
-
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
- .withReverseReader(false)
-
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
+ .withFileSystem(FSUtils.getFs(split.getPath().toString(),
this.jobConf))
+ .withBasePath(split.getBasePath())
+ .withLogFilePaths(split.getDeltaLogPaths())
+ .withReaderSchema(getReaderSchema())
+ .withLatestInstantTime(split.getMaxCommitTime())
+ .withReadBlocksLazily(
+ ConfigUtils.getBooleanWithAltKeys(jobConf,
+ HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
+ .withReverseReader(false)
+
.withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE));
this.executor = new BoundedInMemoryExecutor<>(
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
getParallelProducers(scannerBuilder),
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index a6d1cf66acb..4cb71d442f2 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,10 +18,11 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -74,8 +75,10 @@ public class HoodieRealtimeRecordReaderUtils {
public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {
// jobConf.getMemoryForMapTask() returns in MB
return (long) Math
-
.ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
- HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
+ .ceil(Double.parseDouble(
+ ConfigUtils.getRawValueWithAltKeys(
+ jobConf,
HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION)
+
.orElse(HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION))
* jobConf.getMemoryForMapTask() * 1024 * 1024L);
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index adee06cc20d..87ac14f40c8 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -36,7 +37,6 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.avro.Schema;
@@ -84,7 +84,7 @@ public class TestHoodieMergeOnReadSnapshotReader {
hadoopConf.set("fs.defaultFS", "file:///");
hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(hadoopConf);
- baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(1024 * 1024));
+ baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
String.valueOf(1024 * 1024));
baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
fs = getFs(basePath.toUri().toString(), baseJobConf);
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 1dbcc157c1e..4bef91daaa5 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -43,7 +44,6 @@ import
org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.RealtimeFileStatus;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -114,7 +114,7 @@ public class TestHoodieRealtimeRecordReader {
hadoopConf.set("fs.defaultFS", "file:///");
hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
baseJobConf = new JobConf(hadoopConf);
- baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(1024 * 1024));
+ baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
String.valueOf(1024 * 1024));
fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf);
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index a2716d0e73a..555e340e44a 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -21,7 +21,9 @@ package org.apache.hudi.integ.testsuite.reader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -39,8 +41,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -292,7 +292,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
// readAvro log files
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 054fcc799d7..a2ad13ada2e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -27,16 +27,15 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.LogFileIterator._
-import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.fs.FSUtils.{buildInlineConf,
getRelativePartitionPath}
+import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.util.HoodieRecordUtils
+import org.apache.hudi.common.util.{ConfigUtils, FileIOUtils,
HoodieRecordUtils}
import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hudi.internal.schema.InternalSchema
import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
@@ -52,7 +51,6 @@ import java.io.Closeable
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.util.Try
/**
* Provided w/ list of log files, iterates over all of the records stored in
@@ -306,7 +304,8 @@ object LogFileIterator extends SparkAdapterSupport {
if (HoodieTableMetadata.isMetadataTable(tablePath)) {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(tableState.metadataConfig.getProps)
-
.withSpillableMapDir(hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
.withSpillableMapDir(hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+ FileIOUtils.getDefaultSpillableMapBasePath))
.enable(true).build()
val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val metadataTable = new HoodieBackedTableMetadata(
@@ -341,18 +340,17 @@ object LogFileIterator extends SparkAdapterSupport {
// entailing that table has to have at least one commit
.withLatestInstantTime(tableState.latestCommitTimestamp.get)
.withReadBlocksLazily(
-
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
+ ConfigUtils.getBooleanWithAltKeys(hadoopConf,
+ HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
.withReverseReader(false)
.withInternalSchema(internalSchema)
.withBufferSize(
-
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+
hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
- hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
+ FileIOUtils.getDefaultSpillableMapBasePath))
.withDiskMapType(
hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index cca1fd1da0d..24f80fa2502 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -19,22 +19,20 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.config.HoodieCommonConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.{FileIOUtils, ValidationUtils}
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
import java.util.Objects
import java.util.function.Supplier
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-
import scala.collection.JavaConverters._
class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with
ProcedureBuilder {
@@ -71,8 +69,8 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
.withLogFilePaths(logFilePaths.asJava)
.withReaderSchema(schema)
.withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp)
-
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
-
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
+
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
+
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue)
.withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 2ea66fa3f07..81915953252 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.client.SparkRDDWriteClient
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
-import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieMemoryConfig,
HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,7 +31,6 @@ import
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils,
HoodieSparkClientTestBase}
@@ -338,13 +337,16 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.save(basePath)
// Make force spill
-
spark.sparkContext.hadoopConfiguration.set(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
"0.00001")
+ spark.sparkContext.hadoopConfiguration.set(
+ HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key, "0.00001")
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only
updated
-
spark.sparkContext.hadoopConfiguration.set(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION)
+ spark.sparkContext.hadoopConfiguration.set(
+ HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key,
+ HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION)
}
@ParameterizedTest