This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d006a7555 [HUDI-6874] Move configs for reading a file group to 
hudi-common module (#9746)
d8d006a7555 is described below

commit d8d006a755561c09a58ecb143f62a9c2fcacf098
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Sep 21 07:56:56 2023 -0700

    [HUDI-6874] Move configs for reading a file group to hudi-common module 
(#9746)
    
    This commit moves log merging and memory-related configs to `hudi-common` 
module so the common file group read can use them.  Configs in 
`HoodieRealtimeConfig` are removed if duplicate or merged to existing config. 
Two config names are removed and remove for simplification:
    - `compaction.memory.fraction` -> `hoodie.memory.compaction.fraction`
    - `compaction.lazy.block.read.enabled` -> 
`hoodie.compaction.lazy.block.read`
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 10 +--
 .../cli/commands/TestHoodieLogFileCommand.java     | 10 +--
 .../apache/hudi/config/HoodieCompactionConfig.java | 96 ++++++++++++----------
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  8 +-
 .../src/main/java/org/apache/hudi/io/IOUtils.java  | 12 +--
 .../GenericRecordValidationTestUtils.java          |  4 +-
 .../MultipleSparkJobExecutionStrategy.java         |  8 +-
 .../java/org/apache/hudi/io/TestSparkIOUtils.java  |  6 +-
 .../table/action/compact/TestHoodieCompactor.java  |  2 +-
 .../apache/hudi/common/config/ConfigGroups.java    |  1 +
 .../hudi/common}/config/HoodieMemoryConfig.java    | 28 ++++---
 .../hudi/common/config/HoodieReaderConfig.java     | 54 ++++++++++++
 .../org/apache/hudi/common/util/ConfigUtils.java   | 44 ++++++++++
 .../apache/hudi/common/util/TestConfigUtils.java   | 42 ++++++++++
 .../hudi/sink/compact/FlinkCompactionConfig.java   |  2 +-
 .../org/apache/hudi/table/format/FormatUtils.java  | 56 +++++++++++--
 .../org/apache/hudi/util/FlinkWriteClients.java    |  2 +-
 .../apache/hudi/table/format/TestFormatUtils.java  | 67 +++++++++++++++
 .../hudi/hadoop/config/HoodieRealtimeConfig.java   | 46 -----------
 .../realtime/HoodieMergeOnReadSnapshotReader.java  | 29 ++++---
 .../realtime/RealtimeCompactedRecordReader.java    | 18 ++--
 .../realtime/RealtimeUnmergedRecordReader.java     | 23 ++++--
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  9 +-
 .../TestHoodieMergeOnReadSnapshotReader.java       |  4 +-
 .../realtime/TestHoodieRealtimeRecordReader.java   |  4 +-
 .../reader/DFSHoodieDatasetInputReader.java        |  6 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala | 24 +++---
 .../ShowHoodieLogFileRecordsProcedure.scala        | 10 +--
 .../apache/hudi/functional/TestMORDataSource.scala | 10 ++-
 29 files changed, 435 insertions(+), 200 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 58eff5f7b31..07291ab8609 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -23,6 +23,8 @@ import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -41,8 +43,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.hadoop.CachingPath;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -240,17 +240,17 @@ public class HoodieLogFileCommand {
                       .getCommitTimeline().lastInstant().get().getTimestamp())
               .withReadBlocksLazily(
                   Boolean.parseBoolean(
-                      
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
+                      
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
               .withReverseReader(
                   Boolean.parseBoolean(
-                      
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
+                      
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
               
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
               .withMaxMemorySizeInBytes(
                   
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
               
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
               
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
               
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-              
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+              
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
               .build();
       for (HoodieRecord hoodieRecord : scanner) {
         Option<HoodieAvroIndexedRecord> record = 
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 0f796c8195a..12de150f2d3 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -27,6 +27,8 @@ import 
org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
 import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
 import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -42,8 +44,6 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -242,15 +242,15 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
             HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
         .withReadBlocksLazily(
             Boolean.parseBoolean(
-                
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
+                
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
         .withReverseReader(
             Boolean.parseBoolean(
-                
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
+                
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
         
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
         .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-        
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+        
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
         .build();
 
     Iterator<HoodieRecord> records = scanner.iterator();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 19e2678c8ae..e77308f344b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -34,6 +34,8 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+
 /**
  * Compaction related config.
  */
@@ -146,21 +148,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "compaction during each compaction run. By default. Hudi picks the 
log file "
           + "with most accumulated unmerged data");
 
-  public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE 
= ConfigProperty
-      .key("hoodie.compaction.lazy.block.read")
-      .defaultValue("true")
-      .markAdvanced()
-      .withDocumentation("When merging the delta log files, this config helps 
to choose whether the log blocks "
-          + "should be read lazily or not. Choose true to use lazy block 
reading (low memory usage, but incurs seeks to each block"
-          + " header) or false for immediate block read (higher memory 
usage)");
-
-  public static final ConfigProperty<String> 
COMPACTION_REVERSE_LOG_READ_ENABLE = ConfigProperty
-      .key("hoodie.compaction.reverse.log.read")
-      .defaultValue("false")
-      .markAdvanced()
-      .withDocumentation("HoodieLogFormatReader reads a logfile in the forward 
direction starting from pos=0 to pos=file_length. "
-          + "If this config is set to true, the reader reads the logfile in 
reverse direction, from pos=file_length to pos=0");
-
   public static final ConfigProperty<String> 
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
       .key("hoodie.compaction.daybased.target.partitions")
       .defaultValue("10")
@@ -204,24 +191,24 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Log compaction can be scheduled if the no. of log 
blocks crosses this threshold value. "
           + "This is effective only when log compaction is enabled via " + 
INLINE_LOG_COMPACT.key());
 
-  public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN 
= ConfigProperty
-      .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
-      .defaultValue("false")
-      .markAdvanced()
-      .sinceVersion("0.13.0")
-      .withDocumentation("New optimized scan for log blocks that handles all 
multi-writer use-cases while appending to log files. "
-          + "It also differentiates original blocks written by ingestion 
writers and compacted blocks written log compaction.");
-
-  /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT} and its methods instead
+   */
   @Deprecated
   public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
-  /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its 
methods instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods 
instead
+   */
   @Deprecated
   public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = 
INLINE_COMPACT_NUM_DELTA_COMMITS.key();
-  /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its 
methods instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its 
methods instead
+   */
   @Deprecated
   public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = 
INLINE_COMPACT_TIME_DELTA_SECONDS.key();
-  /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods 
instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods 
instead
+   */
   @Deprecated
   public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = 
INLINE_COMPACT_TRIGGER_STRATEGY.key();
   /**
@@ -289,39 +276,59 @@ public class HoodieCompactionConfig extends HoodieConfig {
    */
   @Deprecated
   public static final String COMPACTION_STRATEGY_PROP = 
COMPACTION_STRATEGY.key();
-  /** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */
+  /**
+   * @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead
+   */
   @Deprecated
   public static final String DEFAULT_COMPACTION_STRATEGY = 
COMPACTION_STRATEGY.defaultValue();
-  /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its 
methods instead */
+  /**
+   * @deprecated Use {@link 
HoodieReaderConfig#COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead
+   */
   @Deprecated
-  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = 
COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
-  /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its 
methods instead */
+  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = 
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
+  /**
+   * @deprecated Use {@link 
HoodieReaderConfig#COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead
+   */
   @Deprecated
-  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = 
COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
-  /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its 
methods instead */
+  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = 
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue();
+  /**
+   * @deprecated Use {@link 
HoodieReaderConfig#COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead
+   */
   @Deprecated
-  public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = 
COMPACTION_REVERSE_LOG_READ_ENABLE.key();
-  /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its 
methods instead */
+  public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = 
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.key();
+  /**
+   * @deprecated Use {@link 
HoodieReaderConfig#COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead
+   */
   @Deprecated
-  public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = 
COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
+  public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = 
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
   /**
    * @deprecated Use {@link #INLINE_COMPACT} and its methods instead
    */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT = 
INLINE_COMPACT.defaultValue();
-  /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its 
methods instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods 
instead
+   */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = 
INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue();
-  /** @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its 
methods instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_TIME_DELTA_SECONDS} and its 
methods instead
+   */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS = 
INLINE_COMPACT_TIME_DELTA_SECONDS.defaultValue();
-  /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods 
instead */
+  /**
+   * @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods 
instead
+   */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = 
INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue();
-  /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and 
its methods instead */
+  /**
+   * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and 
its methods instead
+   */
   @Deprecated
   public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = 
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key();
-  /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and 
its methods instead */
+  /**
+   * @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and 
its methods instead
+   */
   @Deprecated
   public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION 
= TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.defaultValue();
 
@@ -415,12 +422,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder withCompactionLazyBlockReadEnabled(Boolean 
compactionLazyBlockReadEnabled) {
-      compactionConfig.setValue(COMPACTION_LAZY_BLOCK_READ_ENABLE, 
String.valueOf(compactionLazyBlockReadEnabled));
+      
compactionConfig.setValue(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE, 
String.valueOf(compactionLazyBlockReadEnabled));
       return this;
     }
 
     public Builder withCompactionReverseLogReadEnabled(Boolean 
compactionReverseLogReadEnabled) {
-      compactionConfig.setValue(COMPACTION_REVERSE_LOG_READ_ENABLE, 
String.valueOf(compactionReverseLogReadEnabled));
+      
compactionConfig.setValue(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE,
 String.valueOf(compactionReverseLogReadEnabled));
       return this;
     }
 
@@ -456,6 +463,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
 
     public HoodieCompactionConfig build() {
       compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
+      compactionConfig.setDefaults(HoodieReaderConfig.class.getName());
       return compactionConfig;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2b88699f3f4..10e6967e204 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -27,8 +27,10 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieMetaserverConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -1458,7 +1460,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean enableOptimizedLogBlocksScan() {
-    return getBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
+    return getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
   }
 
   public HoodieCleaningPolicy getCleanerPolicy() {
@@ -1590,11 +1592,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public Boolean getCompactionLazyBlockReadEnabled() {
-    return 
getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
+    return getBoolean(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
   }
 
   public Boolean getCompactionReverseLogReadEnabled() {
-    return 
getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
+    return getBoolean(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
   }
 
   public int getArchiveDeleteParallelism() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index 7a08305d2df..5db71ee67a7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -23,12 +23,12 @@ import org.apache.hudi.common.engine.EngineProperty;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.util.Option;
 
-import static 
org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
-import static 
org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
-import static 
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static 
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
-import static 
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
 
 public class IOUtils {
   /**
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 2196b6f0b63..e542fb481b1 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -19,11 +19,11 @@
 package org.apache.hudi.testutils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 import org.apache.avro.Schema;
@@ -94,7 +94,7 @@ public class GenericRecordValidationTestUtils {
         .collect(Collectors.toList());
 
     jobConf.set(String.format(HOODIE_CONSUME_COMMIT, config.getTableName()), 
instant1);
-    jobConf.set(HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, "true");
+    jobConf.set(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), 
"true");
     List<GenericRecord> records = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         hadoopConf, fullPartitionPaths, config.getBasePath(), jobConf, true);
     Map<String, GenericRecord> prevRecordsMap = records.stream()
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index c6a1df9105e..ccdad98fc8e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -431,9 +432,10 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
     Path[] paths;
     if (hasLogFiles) {
-      String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
-          .orElse("0.75");
-      params.put("compaction.memory.fraction", compactionFractor);
+      String rawFractionConfig = 
getWriteConfig().getString(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION);
+      String compactionFractor = rawFractionConfig != null
+          ? rawFractionConfig : 
HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION;
+      params.put(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), 
compactionFractor);
 
       Path[] deltaPaths = clusteringOps
           .stream()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
index 7490a4d337a..0a97e60d43c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
@@ -19,14 +19,14 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import static 
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
-import static 
org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestSparkIOUtils {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 3fd09d5704f..fb4924f97f8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java 
b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index c79d3711c5a..daba6f9203e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -34,6 +34,7 @@ public class ConfigGroups {
     SPARK_DATASOURCE("Spark Datasource Configs"),
     FLINK_SQL("Flink Sql Configs"),
     WRITE_CLIENT("Write Client Configs"),
+    READER("Reader Configs"),
     META_SYNC("Metastore and Catalog Sync Configs"),
     METRICS("Metrics Configs"),
     RECORD_PAYLOAD("Record Payload Config"),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
similarity index 91%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
index 175228a3ced..34e06abec63 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMemoryConfig.java
@@ -7,22 +7,18 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.config;
+package org.apache.hudi.common.config;
 
-import org.apache.hudi.common.config.ConfigClassProperty;
-import org.apache.hudi.common.config.ConfigGroups;
-import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.util.FileIOUtils;
 
 import javax.annotation.concurrent.Immutable;
@@ -32,7 +28,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
-
 /**
  * Memory related config.
  */
@@ -62,6 +57,9 @@ public class HoodieMemoryConfig extends HoodieConfig {
           + "OOM in the Scanner. Hence, a spillable map helps alleviate the 
memory pressure. Use this config to "
           + "set the max allowable inMemory footprint of the spillable map");
 
+  // Maximum fraction of mapper/reducer task memory to use for compaction of 
log files in MR
+  public static final String DEFAULT_MR_COMPACTION_MEMORY_FRACTION = "0.75";
+
   // Default memory size (1GB) per compaction (used if SparkEnv is absent), 
excess spills to disk
   public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 
HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
   // Minimum memory size (100MB) for the spillable map.
@@ -77,6 +75,10 @@ public class HoodieMemoryConfig extends HoodieConfig {
 
   public static final ConfigProperty<Integer> MAX_DFS_STREAM_BUFFER_SIZE = 
HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
 
+  // Default value for MR
+  // Setting this to lower value of 1 MB since no control over how many 
RecordReaders will be started in a mapper
+  public static final int DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; 
// 1 MB
+
   public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = 
ConfigProperty
       .key("hoodie.memory.spillable.map.path")
       .noDefaultValue()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
new file mode 100644
index 00000000000..d25d1898ff2
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configurations for reading a file group
+ */
+@Immutable
+@ConfigClassProperty(name = "Reader Configs",
+    groupName = ConfigGroups.Names.READER,
+    description = "Configurations that control file group reading.")
+public class HoodieReaderConfig {
+  public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE 
= ConfigProperty
+      .key("hoodie.compaction.lazy.block.read")
+      .defaultValue("true")
+      .markAdvanced()
+      .withDocumentation("When merging the delta log files, this config helps 
to choose whether the log blocks "
+          + "should be read lazily or not. Choose true to use lazy block 
reading (low memory usage, but incurs seeks to each block"
+          + " header) or false for immediate block read (higher memory 
usage)");
+
+  public static final ConfigProperty<String> 
COMPACTION_REVERSE_LOG_READ_ENABLE = ConfigProperty
+      .key("hoodie.compaction.reverse.log.read")
+      .defaultValue("false")
+      .markAdvanced()
+      .withDocumentation("HoodieLogFormatReader reads a logfile in the forward 
direction starting from pos=0 to pos=file_length. "
+          + "If this config is set to true, the reader reads the logfile in 
reverse direction, from pos=file_length to pos=0");
+
+  public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN 
= ConfigProperty
+      .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
+      .defaultValue("false")
+      .markAdvanced()
+      .sinceVersion("0.13.0")
+      .withDocumentation("New optimized scan for log blocks that handles all 
multi-writer use-cases while appending to log files. "
+          + "It also differentiates original blocks written by ingestion 
writers and compacted blocks written log compaction.");
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 0b95bbabcb3..9e0655d6734 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -290,6 +290,32 @@ public class ConfigUtils {
     return Option.empty();
   }
 
+  /**
+   * Gets the raw value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
+   * alternative keys are used to fetch the config.
+   *
+   * @param conf           Configs in Hadoop {@link Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return {@link Option} of value if the config exists; empty {@link 
Option} otherwise.
+   */
+  public static Option<String> getRawValueWithAltKeys(Configuration conf,
+                                                      ConfigProperty<?> 
configProperty) {
+    String value = conf.get(configProperty.key());
+    if (value != null) {
+      return Option.of(value);
+    }
+    for (String alternative : configProperty.getAlternatives()) {
+      String altValue = conf.get(alternative);
+      if (altValue != null) {
+        LOG.warn(String.format("The configuration key '%s' has been deprecated 
"
+                + "and may be removed in the future. Please use the new key 
'%s' instead.",
+            alternative, configProperty.key()));
+        return Option.of(altValue);
+      }
+    }
+    return Option.empty();
+  }
+
   /**
    * Gets the String value for a {@link ConfigProperty} config from 
properties. The key and
    * alternative keys are used to fetch the config. If the config is not 
found, an
@@ -416,6 +442,24 @@ public class ConfigUtils {
     return rawValue.map(v -> 
Boolean.parseBoolean(v.toString())).orElse(defaultValue);
   }
 
+  /**
+   * Gets the boolean value for a {@link ConfigProperty} config from Hadoop 
configuration. The key and
+   * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
+   * config, if exists, is returned if the config is not found in the 
configuration.
+   *
+   * @param conf           Configs in Hadoop {@link Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return boolean value if the config exists; default boolean value if the 
config does not exist
+   * and there is default value defined in the {@link ConfigProperty} config; 
{@code false} otherwise.
+   */
+  public static boolean getBooleanWithAltKeys(Configuration conf,
+                                              ConfigProperty<?> 
configProperty) {
+    Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
+    boolean defaultValue = configProperty.hasDefaultValue()
+        ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : 
false;
+    return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
+  }
+
   /**
    * Gets the integer value for a {@link ConfigProperty} config from 
properties. The key and
    * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 1f959ba1b58..ff9ce5c73c6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.config.ConfigProperty;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
@@ -28,6 +31,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TestConfigUtils {
+  public static final ConfigProperty<String> TEST_BOOLEAN_CONFIG_PROPERTY = 
ConfigProperty
+      .key("hoodie.test.boolean.config")
+      .defaultValue("true")
+      .withAlternatives("hudi.test.boolean.config")
+      .markAdvanced()
+      .withDocumentation("Testing boolean config.");
 
   @Test
   public void testToMapSucceeds() {
@@ -67,4 +76,37 @@ public class TestConfigUtils {
     String srcKv = "k.1.1.2=v1=v1.1\nk.2.1.2=v2\nk.3.1.2=v3";
     assertThrows(IllegalArgumentException.class, () -> 
ConfigUtils.toMap(srcKv));
   }
+
+  @Test
+  public void testGetRawValueWithAltKeysFromHadoopConf() {
+    Configuration conf = new Configuration();
+    assertEquals(Option.empty(), ConfigUtils.getRawValueWithAltKeys(conf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        ConfigUtils.getRawValueWithAltKeys(conf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    conf = new Configuration();
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        ConfigUtils.getRawValueWithAltKeys(conf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
+
+  @Test
+  public void testGetBooleanWithAltKeysFromHadoopConf() {
+    Configuration conf = new Configuration();
+    
assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()),
+        ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(setValue,
+        ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    conf = new Configuration();
+    conf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(setValue,
+        ConfigUtils.getBooleanWithAltKeys(conf, TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index e783fd9cc8f..813a293e2c4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -18,10 +18,10 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index f408ae316eb..84f50593e9e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.format;
 
-import java.util.stream.Collectors;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieOperation;
@@ -32,12 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.FlinkWriteClients;
@@ -59,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Utilities for format.
@@ -202,13 +204,12 @@ public class FormatUtils {
           .withInternalSchema(internalSchema)
           .withLatestInstantTime(split.getLatestCommit())
           .withReadBlocksLazily(
-              string2Boolean(
-                  
flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-                      
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+              getBooleanWithAltKeys(flinkConf,
+                  HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
           .withReverseReader(false)
           .withBufferSize(
-              
flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-                  HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+              
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+                  HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
           .withInstantRange(split.getInstantRange())
           .withRecordMerger(merger);
 
@@ -276,6 +277,45 @@ public class FormatUtils {
         .build();
   }
 
+  /**
+   * Gets the raw value for a {@link ConfigProperty} config from Flink 
configuration. The key and
+   * alternative keys are used to fetch the config.
+   *
+   * @param flinkConf      Configs in Flink {@link 
org.apache.flink.configuration.Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return {@link Option} of value if the config exists; empty {@link 
Option} otherwise.
+   */
+  public static Option<String> 
getRawValueWithAltKeys(org.apache.flink.configuration.Configuration flinkConf,
+                                                      ConfigProperty<?> 
configProperty) {
+    if (flinkConf.containsKey(configProperty.key())) {
+      return Option.ofNullable(flinkConf.getString(configProperty.key(), ""));
+    }
+    for (String alternative : configProperty.getAlternatives()) {
+      if (flinkConf.containsKey(alternative)) {
+        return Option.ofNullable(flinkConf.getString(alternative, ""));
+      }
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Gets the boolean value for a {@link ConfigProperty} config from Flink 
configuration. The key and
+   * alternative keys are used to fetch the config. The default value of 
{@link ConfigProperty}
+   * config, if exists, is returned if the config is not found in the 
configuration.
+   *
+   * @param conf           Configs in Flink {@link Configuration}.
+   * @param configProperty {@link ConfigProperty} config to fetch.
+   * @return boolean value if the config exists; default boolean value if the 
config does not exist
+   * and there is default value defined in the {@link ConfigProperty} config; 
{@code false} otherwise.
+   */
+  public static boolean 
getBooleanWithAltKeys(org.apache.flink.configuration.Configuration conf,
+                                              ConfigProperty<?> 
configProperty) {
+    Option<String> rawValue = getRawValueWithAltKeys(conf, configProperty);
+    boolean defaultValue = configProperty.hasDefaultValue()
+        ? Boolean.parseBoolean(configProperty.defaultValue().toString()) : 
false;
+    return rawValue.map(Boolean::parseBoolean).orElse(defaultValue);
+  }
+
   private static Boolean string2Boolean(String s) {
     return "true".equals(s.toLowerCase(Locale.ROOT));
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 6d8b0d0a7d6..49619321d01 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
@@ -34,7 +35,6 @@ import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
new file mode 100644
index 00000000000..711c694614b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFormatUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hudi.common.util.TestConfigUtils.TEST_BOOLEAN_CONFIG_PROPERTY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link FormatUtils}
+ */
+public class TestFormatUtils {
+  @Test
+  public void testGetRawValueWithAltKeys() {
+    Configuration flinkConf = new Configuration();
+    assertEquals(Option.empty(),
+        FormatUtils.getRawValueWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        FormatUtils.getRawValueWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    flinkConf = new Configuration();
+    
flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(Option.of(String.valueOf(setValue)),
+        FormatUtils.getRawValueWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
+
+  @Test
+  public void testGetBooleanWithAltKeys() {
+    Configuration flinkConf = new Configuration();
+    
assertEquals(Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue()),
+        FormatUtils.getBooleanWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    boolean setValue = 
!Boolean.parseBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.defaultValue());
+    flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.key(), setValue);
+    assertEquals(setValue,
+        FormatUtils.getBooleanWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+
+    flinkConf = new Configuration();
+    
flinkConf.setBoolean(TEST_BOOLEAN_CONFIG_PROPERTY.getAlternatives().get(0), 
setValue);
+    assertEquals(setValue,
+        FormatUtils.getBooleanWithAltKeys(flinkConf, 
TEST_BOOLEAN_CONFIG_PROPERTY));
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
deleted file mode 100644
index a9688fed5b8..00000000000
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HoodieRealtimeConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.config;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-
-/**
- * Class to hold props related to Hoodie RealtimeInputFormat and 
RealtimeRecordReader.
- */
-public final class HoodieRealtimeConfig {
-
-  // Fraction of mapper/reducer task memory used for compaction of log files
-  public static final String COMPACTION_MEMORY_FRACTION_PROP = 
"compaction.memory.fraction";
-  public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
-  // used to choose a trade off between IO vs Memory when performing 
compaction process
-  // Depending on outputfile size and memory provided, choose true to avoid 
OOM for large file
-  // size + small memory
-  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = 
"compaction.lazy.block.read.enabled";
-  public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = 
"true";
-
-  // Property to set the max memory for dfs inputstream buffer size
-  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = 
"hoodie.memory.dfs.buffer.max.size";
-  // Setting this to lower value of 1 MB since no control over how many 
RecordReaders will be started in a mapper
-  public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 
1 MB
-  // Property to set file path prefix for spillable file
-  public static final String SPILLABLE_MAP_BASE_PATH_PROP = 
"hoodie.memory.spillable.map.path";
-  // Default file path prefix for spillable file
-  public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
-  public static final String ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN =
-      "hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN;
-}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
index 1cc8bf91b25..a4dc9b0cc67 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -48,13 +50,11 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
 import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
 import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
 import static 
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
@@ -117,7 +117,8 @@ public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReade
     Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
     this.mergedRecordsByKey = new ExternalSpillableMap<>(
         getMaxCompactionMemoryInBytes(jobConf),
-        jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH),
+        jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
+            FileIOUtils.getDefaultSpillableMapBasePath()),
         new DefaultSizeEstimator(),
         new HoodieRecordSizeEstimator(readerSchema),
         jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
@@ -185,13 +186,17 @@ public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReade
         .withReaderSchema(readerSchema)
         .withLatestInstantTime(latestInstantTime)
         .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
-        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReadBlocksLazily(
+            ConfigUtils.getBooleanWithAltKeys(jobConf, 
COMPACTION_LAZY_BLOCK_READ_ENABLE))
         .withReverseReader(false)
-        .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE.key(),
+            DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
+            FileIOUtils.getDefaultSpillableMapBasePath()))
         .withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
         
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-        
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
 false))
+        
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
+            
Boolean.parseBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
         
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
         .build();
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 2a271203d77..ba9c2b9a5a7 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -20,15 +20,18 @@ package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -90,14 +93,19 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         .withReaderSchema(getLogScannerReaderSchema())
         .withLatestInstantTime(split.getMaxCommitTime())
         
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
-        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReadBlocksLazily(
+            ConfigUtils.getBooleanWithAltKeys(jobConf,
+                HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
         .withReverseReader(false)
-        
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
 HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        
.withBufferSize(jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+            HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+        
.withSpillableMapBasePath(jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+            FileIOUtils.getDefaultSpillableMapBasePath()))
         
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
 HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
         
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
             
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-        
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
 false))
+        
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
+            
Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
         
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
         .build();
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index a40519df92d..82283807c1d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -18,8 +18,11 @@
 
 package org.apache.hudi.hadoop.realtime;
 
+import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
@@ -29,7 +32,6 @@ import org.apache.hudi.common.util.queue.HoodieProducer;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.hadoop.RecordReaderValueIterator;
 import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 import org.apache.avro.generic.GenericRecord;
@@ -76,14 +78,17 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
 
     HoodieUnMergedLogRecordScanner.Builder scannerBuilder =
         HoodieUnMergedLogRecordScanner.newBuilder()
-          .withFileSystem(FSUtils.getFs(split.getPath().toString(), 
this.jobConf))
-          .withBasePath(split.getBasePath())
-          .withLogFilePaths(split.getDeltaLogPaths())
-          .withReaderSchema(getReaderSchema())
-          .withLatestInstantTime(split.getMaxCommitTime())
-          
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
-          .withReverseReader(false)
-          
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
+            .withFileSystem(FSUtils.getFs(split.getPath().toString(), 
this.jobConf))
+            .withBasePath(split.getBasePath())
+            .withLogFilePaths(split.getDeltaLogPaths())
+            .withReaderSchema(getReaderSchema())
+            .withLatestInstantTime(split.getMaxCommitTime())
+            .withReadBlocksLazily(
+                ConfigUtils.getBooleanWithAltKeys(jobConf,
+                    HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
+            .withReverseReader(false)
+            
.withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+                HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE));
 
     this.executor = new BoundedInMemoryExecutor<>(
         
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), 
getParallelProducers(scannerBuilder),
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index a6d1cf66acb..4cb71d442f2 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 
@@ -74,8 +75,10 @@ public class HoodieRealtimeRecordReaderUtils {
   public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {
     // jobConf.getMemoryForMapTask() returns in MB
     return (long) Math
-        
.ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
-            HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
+        .ceil(Double.parseDouble(
+            ConfigUtils.getRawValueWithAltKeys(
+                    jobConf, 
HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION)
+                
.orElse(HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION))
             * jobConf.getMemoryForMapTask() * 1024 * 1024L);
   }
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index adee06cc20d..87ac14f40c8 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -36,7 +37,6 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
 
 import org.apache.avro.Schema;
@@ -84,7 +84,7 @@ public class TestHoodieMergeOnReadSnapshotReader {
     hadoopConf.set("fs.defaultFS", "file:///");
     hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
     baseJobConf = new JobConf(hadoopConf);
-    baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
String.valueOf(1024 * 1024));
+    baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
     baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
     baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
     fs = getFs(basePath.toUri().toString(), baseJobConf);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 1dbcc157c1e..4bef91daaa5 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -43,7 +44,6 @@ import 
org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.RealtimeFileStatus;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
@@ -114,7 +114,7 @@ public class TestHoodieRealtimeRecordReader {
     hadoopConf.set("fs.defaultFS", "file:///");
     hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
     baseJobConf = new JobConf(hadoopConf);
-    baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
String.valueOf(1024 * 1024));
+    baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
     fs = FSUtils.getFs(basePath.toUri().toString(), baseJobConf);
   }
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index a2716d0e73a..555e340e44a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -21,7 +21,9 @@ package org.apache.hudi.integ.testsuite.reader;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -39,8 +41,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypeUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 
@@ -292,7 +292,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
           
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
           
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
           
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-          
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
+          
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
           
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
           .build();
       // readAvro log files
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 054fcc799d7..a2ad13ada2e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -27,16 +27,15 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.LogFileIterator._
-import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieMetadataConfig, HoodieReaderConfig, TypedProperties}
 import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.fs.FSUtils.{buildInlineConf, 
getRelativePartitionPath}
+import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.util.HoodieRecordUtils
+import org.apache.hudi.common.util.{ConfigUtils, FileIOUtils, 
HoodieRecordUtils}
 import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hudi.internal.schema.InternalSchema
 import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
@@ -52,7 +51,6 @@ import java.io.Closeable
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.Try
 
 /**
  * Provided w/ list of log files, iterates over all of the records stored in
@@ -306,7 +304,8 @@ object LogFileIterator extends SparkAdapterSupport {
     if (HoodieTableMetadata.isMetadataTable(tablePath)) {
       val metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(tableState.metadataConfig.getProps)
-        
.withSpillableMapDir(hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
 HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        
.withSpillableMapDir(hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
+          FileIOUtils.getDefaultSpillableMapBasePath))
         .enable(true).build()
       val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
       val metadataTable = new HoodieBackedTableMetadata(
@@ -341,18 +340,17 @@ object LogFileIterator extends SparkAdapterSupport {
         //       entailing that table has to have at least one commit
         .withLatestInstantTime(tableState.latestCommitTimestamp.get)
         .withReadBlocksLazily(
-          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-            .getOrElse(false))
+          ConfigUtils.getBooleanWithAltKeys(hadoopConf,
+            HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
         .withReverseReader(false)
         .withInternalSchema(internalSchema)
         .withBufferSize(
-          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+          
hadoopConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+            HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
         .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
         .withSpillableMapBasePath(
-          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+          hadoopConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key,
+            FileIOUtils.getDefaultSpillableMapBasePath))
         .withDiskMapType(
           hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
             HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index cca1fd1da0d..24f80fa2502 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -19,22 +19,20 @@ package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.config.HoodieCommonConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
 import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.{FileIOUtils, ValidationUtils}
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig}
 import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
 import java.util.Objects
 import java.util.function.Supplier
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-
 import scala.collection.JavaConverters._
 
 class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with 
ProcedureBuilder {
@@ -71,8 +69,8 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         .withLogFilePaths(logFilePaths.asJava)
         .withReaderSchema(schema)
         
.withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp)
-        
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
-        
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
+        
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
+        
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
         
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue)
         
.withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
         .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 2ea66fa3f07..81915953252 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.client.SparkRDDWriteClient
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieMemoryConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,7 +31,6 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, 
HoodieSparkClientTestBase}
@@ -338,13 +337,16 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .save(basePath)
 
     // Make force spill
-    
spark.sparkContext.hadoopConfiguration.set(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
 "0.00001")
+    spark.sparkContext.hadoopConfiguration.set(
+      HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key, "0.00001")
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/*/*/*/*")
     assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only 
updated
-    
spark.sparkContext.hadoopConfiguration.set(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION)
+    spark.sparkContext.hadoopConfiguration.set(
+      HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key,
+      HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION)
   }
 
   @ParameterizedTest

Reply via email to