This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c0576131759 [HUDI-6798] Add record merging mode and implement 
event-time ordering in the new file group reader (#9894)
c0576131759 is described below

commit c05761317596585a3c0c3cc69a34b4407843351c
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Sun Jun 9 20:48:09 2024 -0700

    [HUDI-6798] Add record merging mode and implement event-time ordering in 
the new file group reader (#9894)
    
    This PR adds a new table config `hoodie.record.merge.mode` to control the
    record merging mode and behavior in the new file group reader
    (`HoodieFileGroupReader`) and implements event-time ordering in it.
    The config `hoodie.record.merge.mode` is going to be the single config that
    determines how the record merging happens in release 1.0 and beyond.
    
    ---------
    
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../hudi/client/TestTableSchemaEvolution.java      |   3 +
 .../hudi/common/config/HoodieCommonConfig.java     |   3 +
 .../apache/hudi/common/config/RecordMergeMode.java |  36 ++++
 .../hudi/common/table/HoodieTableConfig.java       |  13 +-
 .../hudi/common/table/HoodieTableMetaClient.java   | 114 ++++++++++-
 .../table/log/BaseHoodieLogRecordReader.java       |   7 +
 .../table/log/HoodieMergedLogRecordReader.java     |  13 +-
 .../read/HoodieBaseFileGroupRecordBuffer.java      | 209 ++++++++++++++++-----
 .../common/table/read/HoodieFileGroupReader.java   |  26 ++-
 .../table/read/TestHoodieFileGroupReaderBase.java  |  77 ++++++--
 .../common/table/TestHoodieTableMetaClient.java    | 144 ++++++++++++++
 .../hudi/common/table/read/TestCustomMerger.java   |   4 +
 .../common/table/read/TestEventTimeMerging.java    |   4 +
 ...stHoodiePositionBasedFileGroupRecordBuffer.java |   6 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  11 +-
 15 files changed, 588 insertions(+), 82 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index f5fa70c6668..496b42c13d6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -48,6 +49,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
@@ -165,6 +167,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     HoodieTableMetaClient.withPropertyBuilder()
         .fromMetaClient(metaClient)
         .setTableType(HoodieTableType.MERGE_ON_READ)
+        
.setRecordMergeMode(RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue()))
         .setTimelineLayoutVersion(VERSION_1)
         .initTable(metaClient.getStorageConf().newInstance(), 
metaClient.getBasePath());
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 1a4c2e31780..c96b07ee4f0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.config;
 
+import org.apache.hudi.common.table.HoodieTableConfig;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 
@@ -81,6 +82,8 @@ public class HoodieCommonConfig extends HoodieConfig {
           + " operation will fail schema compatibility check. Set this option 
to true will make the missing "
           + " column be filled with null values to successfully complete the 
write operation.");
 
+  public static final ConfigProperty<String> RECORD_MERGE_MODE = 
HoodieTableConfig.RECORD_MERGE_MODE;
+
   public static final ConfigProperty<ExternalSpillableMap.DiskMapType> 
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
       .key("hoodie.common.spillable.diskmap.type")
       .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java 
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
new file mode 100644
index 00000000000..641f3514ad6
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+@EnumDescription("Determines the logic of merging updates")
+public enum RecordMergeMode {
+  @EnumFieldDescription("Using transaction time to merge records, i.e., the 
record from later "
+      + "transaction overwrites the earlier record with the same key.")
+  OVERWRITE_WITH_LATEST,
+
+  @EnumFieldDescription("Using event time as the ordering to merge records, 
i.e., the record "
+      + "with the larger event time overwrites the record with the smaller 
event time on the "
+      + "same key, regardless of transaction time. The event time or 
preCombine field needs "
+      + "to be specified by the user.")
+  EVENT_TIME_ORDERING,
+
+  @EnumFieldDescription("Using custom merging logic specified by the user.")
+  CUSTOM
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 87263a13f9d..b3bf9668d93 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.BootstrapIndexType;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -45,8 +46,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
@@ -175,6 +176,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Version of timeline used, by the table.");
 
+  public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
+      .key("hoodie.record.merge.mode")
+      .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+      .sinceVersion("1.0.0")
+      .withDocumentation(RecordMergeMode.class);
+
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
       .defaultValue(DefaultHoodieRecordPayload.class.getName())
@@ -532,6 +539,10 @@ public class HoodieTableConfig extends HoodieConfig {
     setValue(VERSION, Integer.toString(tableVersion.versionCode()));
   }
 
+  public RecordMergeMode getRecordMergeMode() {
+    return 
RecordMergeMode.valueOf(getStringOrDefault(RECORD_MERGE_MODE).toUpperCase());
+  }
+
   /**
    * Read the payload class for HoodieRecords from the table properties.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e8e99ff9a0c..250091ecec6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -22,17 +22,20 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetaserverConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.fs.ConsistencyGuard;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
 import org.apache.hudi.common.fs.NoOpConsistencyGuard;
 import org.apache.hudi.common.model.BootstrapIndexType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
@@ -47,7 +50,6 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.TableNotFoundException;
@@ -76,10 +78,14 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
 import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.io.storage.HoodieIOFactory.getIOFactory;
 
 /**
@@ -161,7 +167,7 @@ public class HoodieTableMetaClient implements Serializable {
     Option<TimelineLayoutVersion> tableConfigVersion = 
tableConfig.getTimelineLayoutVersion();
     if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
       // Ensure layout version passed in config is not lower than the one seen 
in hoodie.properties
-      
ValidationUtils.checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get())
 >= 0,
+      checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 
0,
           "Layout Version defined in hoodie properties has higher version (" + 
tableConfigVersion.get()
               + ") than the one passed in config (" + layoutVersion.get() + 
")");
     }
@@ -196,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable {
                                    String indexType,
                                    Map<String, Map<String, String>> columns,
                                    Map<String, String> options) {
-    ValidationUtils.checkState(
+    checkState(
         !indexMetadataOpt.isPresent() || 
!indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName),
         "Index metadata is already present");
     List<String> columnNames = new ArrayList<>(columns.keySet());
@@ -892,9 +898,9 @@ public class HoodieTableMetaClient implements Serializable {
     }
 
     public HoodieTableMetaClient build() {
-      ValidationUtils.checkArgument(conf != null || storage != null,
+      checkArgument(conf != null || storage != null,
           "Storage configuration or HoodieStorage needs to be set to init 
HoodieTableMetaClient");
-      ValidationUtils.checkArgument(basePath != null, "basePath needs to be 
set to init HoodieTableMetaClient");
+      checkArgument(basePath != null, "basePath needs to be set to init 
HoodieTableMetaClient");
       if (timeGeneratorConfig == null) {
         timeGeneratorConfig = 
HoodieTimeGeneratorConfig.newBuilder().withPath(basePath).build();
       }
@@ -923,6 +929,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String recordKeyFields;
     private String secondaryKeyFields;
     private String archiveLogFolder;
+    private RecordMergeMode recordMergeMode;
     private String payloadClassName;
     private String payloadType;
     private String recordMergerStrategy;
@@ -999,6 +1006,11 @@ public class HoodieTableMetaClient implements 
Serializable {
       return this;
     }
 
+    public PropertyBuilder setRecordMergeMode(RecordMergeMode recordMergeMode) 
{
+      this.recordMergeMode = recordMergeMode;
+      return this;
+    }
+
     public PropertyBuilder setPayloadClassName(String payloadClassName) {
       this.payloadClassName = payloadClassName;
       return this;
@@ -1144,6 +1156,7 @@ public class HoodieTableMetaClient implements 
Serializable {
       return setTableType(metaClient.getTableType())
           .setTableName(metaClient.getTableConfig().getTableName())
           
.setArchiveLogFolder(metaClient.getTableConfig().getArchivelogFolder())
+          .setRecordMergeMode(metaClient.getTableConfig().getRecordMergeMode())
           .setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
           
.setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy());
     }
@@ -1173,6 +1186,10 @@ public class HoodieTableMetaClient implements 
Serializable {
         setArchiveLogFolder(
             hoodieConfig.getString(HoodieTableConfig.ARCHIVELOG_FOLDER));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE)) {
+        setRecordMergeMode(
+            
RecordMergeMode.valueOf(hoodieConfig.getString(HoodieTableConfig.RECORD_MERGE_MODE).toUpperCase()));
+      }
       if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)) {
         
setPayloadClassName(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
       } else if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)) {
@@ -1262,8 +1279,8 @@ public class HoodieTableMetaClient implements 
Serializable {
     }
 
     public Properties build() {
-      ValidationUtils.checkArgument(tableType != null, "tableType is null");
-      ValidationUtils.checkArgument(tableName != null, "tableName is null");
+      checkArgument(tableType != null, "tableType is null");
+      checkArgument(tableName != null, "tableName is null");
 
       HoodieTableConfig tableConfig = new HoodieTableConfig();
 
@@ -1285,6 +1302,11 @@ public class HoodieTableMetaClient implements 
Serializable {
         if (recordMergerStrategy != null) {
           tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, 
recordMergerStrategy);
         }
+        inferRecordMergeMode();
+        validateMergeConfigs();
+        if (recordMergeMode != null) {
+          tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
+        }
       }
 
       if (null != tableCreateSchema) {
@@ -1385,5 +1407,83 @@ public class HoodieTableMetaClient implements 
Serializable {
         throws IOException {
       return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, 
basePath, build());
     }
+
+    private void inferRecordMergeMode() {
+      if (null == recordMergeMode) {
+        boolean payloadClassNameSet = null != payloadClassName;
+        boolean payloadTypeSet = null != payloadType;
+        boolean recordMergerStrategySet = null != recordMergerStrategy;
+
+        if (!recordMergerStrategySet
+            || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+          if (payloadClassNameSet) {
+            if 
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
+              recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+            } else if 
(payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) {
+              recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+            } else {
+              recordMergeMode = RecordMergeMode.CUSTOM;
+            }
+          } else if (payloadTypeSet) {
+            if 
(payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) {
+              recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+            } else if 
(payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) {
+              recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+            } else {
+              recordMergeMode = RecordMergeMode.CUSTOM;
+            }
+          } else {
+            LOG.warn("One of the payload class name or payload type must be 
set for the MERGE_ON_READ table");
+            recordMergeMode = 
RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue());
+            LOG.warn("Setting the record merge mode to the default: {}", 
recordMergeMode);
+          }
+        } else {
+          // Custom merger strategy is set
+          recordMergeMode = RecordMergeMode.CUSTOM;
+        }
+      }
+    }
+
+    private void validateMergeConfigs() {
+      boolean payloadClassNameSet = null != payloadClassName;
+      boolean payloadTypeSet = null != payloadType;
+      boolean recordMergerStrategySet = null != recordMergerStrategy;
+      boolean recordMergeModeSet = null != recordMergeMode;
+
+      checkArgument(recordMergeModeSet,
+          "Record merge mode " + HoodieTableConfig.RECORD_MERGE_MODE.key() + " 
should be set");
+      switch (recordMergeMode) {
+        case OVERWRITE_WITH_LATEST:
+          checkArgument((!payloadClassNameSet && !payloadTypeSet)
+                  || (payloadClassNameSet && 
payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName()))
+                  || (payloadTypeSet && 
payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())),
+              constructMergeConfigErrorMessage());
+          break;
+        case EVENT_TIME_ORDERING:
+          checkArgument((!payloadClassNameSet && !payloadTypeSet)
+                  || (payloadClassNameSet && 
payloadClassName.equals(DefaultHoodieRecordPayload.class.getName()))
+                  || (payloadTypeSet && 
payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())),
+              constructMergeConfigErrorMessage());
+          checkArgument(!recordMergerStrategySet
+                  || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID),
+              "Record merger strategy (" + (recordMergerStrategySet ? 
recordMergerStrategy : "null")
+                  + ") should be consistent with the record merging mode 
EVENT_TIME_ORDERING");
+          break;
+        case CUSTOM:
+        default:
+          // No op
+      }
+    }
+
+    private String constructMergeConfigErrorMessage() {
+      StringBuilder stringBuilder = new StringBuilder();
+      stringBuilder.append("Payload class name (");
+      stringBuilder.append(payloadClassName != null ? payloadClassName : 
"null");
+      stringBuilder.append(") or type (");
+      stringBuilder.append(payloadType != null ? payloadType : "null");
+      stringBuilder.append(") should be consistent with the record merge mode 
");
+      stringBuilder.append(recordMergeMode);
+      return stringBuilder.toString();
+    }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index d58c54a929f..2f38dc9b258 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -97,6 +98,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
   protected final String preCombineField;
   // Stateless component for merging records
   protected final HoodieRecordMerger recordMerger;
+  // Record merge mode
+  protected final RecordMergeMode recordMergeMode;
   private final TypedProperties payloadProps;
   // Log File Paths
   protected final List<String> logFilePaths;
@@ -148,6 +151,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
                                       Option<String> keyFieldOverride,
                                       boolean enableOptimizedLogBlocksScan,
                                       HoodieRecordMerger recordMerger,
+                                      RecordMergeMode recordMergeMode,
                                       HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
@@ -166,6 +170,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     }
     this.payloadProps = props;
     this.recordMerger = recordMerger;
+    this.recordMergeMode = recordMergeMode;
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
     this.reverseReader = reverseReader;
@@ -866,6 +871,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
       throw new UnsupportedOperationException();
     }
 
+    public abstract Builder withRecordMergeMode(RecordMergeMode 
recordMergeMode);
+
     public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
       throw new UnsupportedOperationException();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index bcc821a34a1..c79de61536c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
@@ -75,9 +76,10 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
                                       Option<String> keyFieldOverride,
                                       boolean enableOptimizedLogBlocksScan,
                                       HoodieRecordMerger recordMerger,
+                                      RecordMergeMode recordMergeMode,
                                       HoodieFileGroupRecordBuffer<T> 
recordBuffer) {
     super(readerContext, storage, logFilePaths, reverseReader, bufferSize, 
instantRange, withOperationField,
-        forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordMerger, recordBuffer);
+        forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordMerger, recordMergeMode, recordBuffer);
     this.scannedPrefixes = new HashSet<>();
 
     if (forceFullScan) {
@@ -228,6 +230,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     private boolean forceFullScan = true;
     private boolean enableOptimizedLogBlocksScan = false;
     private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
+    private RecordMergeMode recordMergeMode;
 
     private HoodieFileGroupRecordBuffer<T> recordBuffer;
 
@@ -293,6 +296,12 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
+    @Override
+    public Builder<T> withRecordMergeMode(RecordMergeMode recordMergeMode) {
+      this.recordMergeMode = recordMergeMode;
+      return this;
+    }
+
     public Builder<T> withKeyFiledOverride(String keyFieldOverride) {
       this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
       return this;
@@ -324,7 +333,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
           withOperationField, forceFullScan,
           Option.ofNullable(partitionName),
           Option.ofNullable(keyFieldOverride),
-          enableOptimizedLogBlocksScan, recordMerger,
+          enableOptimizedLogBlocksScan, recordMerger, recordMergeMode,
           recordBuffer);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index ed8d643b215..984d9740ceb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
@@ -33,11 +34,13 @@ import 
org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -60,12 +63,14 @@ import java.util.function.Function;
 
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+import static 
org.apache.hudi.common.table.read.HoodieFileGroupReader.getRecordMergeMode;
 
 public abstract class HoodieBaseFileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T> {
   protected final HoodieReaderContext<T> readerContext;
   protected final Schema readerSchema;
   protected final Option<String> partitionNameOverrideOpt;
   protected final Option<String[]> partitionPathFieldOpt;
+  protected final RecordMergeMode recordMergeMode;
   protected final HoodieRecordMerger recordMerger;
   protected final TypedProperties payloadProps;
   protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
@@ -90,6 +95,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
     this.partitionPathFieldOpt = partitionPathFieldOpt;
+    this.recordMergeMode = getRecordMergeMode(payloadProps);
     this.recordMerger = recordMerger;
     this.payloadProps = payloadProps;
     this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
@@ -147,6 +153,37 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     records.clear();
   }
 
+  /**
+   * Compares two {@link Comparable}s.  If both are numbers, converts them to 
{@link Long} for comparison.
+   * If one of the {@link Comparable}s is a String, assumes that both are 
String values for comparison.
+   *
+   * @param o1 {@link Comparable} object.
+   * @param o2 other {@link Comparable} object to compare to.
+   * @return comparison result.
+   */
+  @VisibleForTesting
+  static int compareTo(Comparable o1, Comparable o2) {
+    // TODO(HUDI-7848): fix the delete records to contain the correct ordering 
value type
+    //  so this util with the number comparison is not necessary.
+    try {
+      return o1.compareTo(o2);
+    } catch (ClassCastException e) {
+      if (o1 instanceof Number && o2 instanceof Number) {
+        Long o1LongValue = ((Number) o1).longValue();
+        Long o2LongValue = ((Number) o2).longValue();
+        return o1LongValue.compareTo(o2LongValue);
+      } else if (o1 instanceof String || o2 instanceof String) {
+        return o1.toString().compareTo(o2.toString());
+      } else {
+        throw new IllegalArgumentException("Cannot compare values in different 
types: "
+            + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + 
")");
+      }
+    } catch (Throwable e) {
+      throw new HoodieException("Cannot compare values: "
+          + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")", 
e);
+    }
+  }
+
   /**
    * Merge two log data records if needed.
    *
@@ -160,42 +197,76 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                                                          
Map<String, Object> metadata,
                                                                          
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws 
IOException {
     if (existingRecordMetadataPair != null) {
-      // Merge and store the combined record
-      // Note that the incoming `record` is from an older commit, so it should 
be put as
-      // the `older` in the merge API
-      Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
enablePartialMerging
-          ? recordMerger.partialMerge(
-          readerContext.constructHoodieRecord(Option.of(record), metadata),
-          (Schema) metadata.get(INTERNAL_META_SCHEMA),
-          readerContext.constructHoodieRecord(
-              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-          (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
-          readerSchema,
-          payloadProps)
-          : recordMerger.merge(
-          readerContext.constructHoodieRecord(Option.of(record), metadata),
-          (Schema) metadata.get(INTERNAL_META_SCHEMA),
-          readerContext.constructHoodieRecord(
-              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-          (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
-          payloadProps);
-
-      if (!combinedRecordAndSchemaOpt.isPresent()) {
+      if (enablePartialMerging) {
+        // TODO(HUDI-7843): decouple the merging logic from the merger
+        //  and use the record merge mode to control how to merge partial 
updates
+        // Merge and store the combined record
+        // Note that the incoming `record` is from an older commit, so it 
should be put as
+        // the `older` in the merge API
+        Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.partialMerge(
+            readerContext.constructHoodieRecord(Option.of(record), metadata),
+            (Schema) metadata.get(INTERNAL_META_SCHEMA),
+            readerContext.constructHoodieRecord(
+                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+            readerSchema,
+            payloadProps);
+        if (!combinedRecordAndSchemaOpt.isPresent()) {
+          return Option.empty();
+        }
+        Pair<HoodieRecord, Schema> combinedRecordAndSchema = 
combinedRecordAndSchemaOpt.get();
+        HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
+        // If pre-combine returns existing record, no need to update it
+        if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
+          return Option.of(Pair.of(
+              combinedRecord.getData(),
+              
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())));
+        }
         return Option.empty();
+      } else {
+        switch (recordMergeMode) {
+          case OVERWRITE_WITH_LATEST:
+            return Option.empty();
+          case EVENT_TIME_ORDERING:
+            Comparable existingOrderingValue = readerContext.getOrderingValue(
+                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+            if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingValue)) {
+              return Option.empty();
+            }
+            Comparable incomingOrderingValue = readerContext.getOrderingValue(
+                Option.of(record), metadata, readerSchema, payloadProps);
+            if (compareTo(incomingOrderingValue, existingOrderingValue) > 0) {
+              return Option.of(Pair.of(record, metadata));
+            }
+            return Option.empty();
+          case CUSTOM:
+          default:
+            // Merge and store the combined record
+            // Note that the incoming `record` is from an older commit, so it 
should be put as
+            // the `older` in the merge API
+            Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.merge(
+                readerContext.constructHoodieRecord(Option.of(record), 
metadata),
+                (Schema) metadata.get(INTERNAL_META_SCHEMA),
+                readerContext.constructHoodieRecord(
+                    existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+                (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+                payloadProps);
+
+            if (!combinedRecordAndSchemaOpt.isPresent()) {
+              return Option.empty();
+            }
+
+            Pair<HoodieRecord, Schema> combinedRecordAndSchema = 
combinedRecordAndSchemaOpt.get();
+            HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
+            // If pre-combine returns existing record, no need to update it
+            if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
+              return Option.of(Pair.of(combinedRecord.getData(), metadata));
+            }
+            return Option.empty();
+        }
       }
-
-      Pair<HoodieRecord, Schema> combinedRecordAndSchema = 
combinedRecordAndSchemaOpt.get();
-      HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
-
-      // If pre-combine returns existing record, no need to update it
-      if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
-        return Option.of(Pair.of(
-            combinedRecord.getData(),
-            enablePartialMerging
-                ? 
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())
-                : metadata));
-      }
-      return Option.empty();
     } else {
       // Put the record as is
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
@@ -265,7 +336,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
    * @param dataBlock current processed block
    * @return final read schema.
    */
-  protected Option<Pair<Function<T,T>, Schema>> 
composeEvolvedSchemaTransformer(
+  protected Option<Pair<Function<T, T>, Schema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
     if (internalSchema.isEmptySchema()) {
       return Option.empty();
@@ -274,7 +345,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
     InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
         hoodieTableMetaClient, false);
-    Pair<InternalSchema, Map<String,String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
+    Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
         true, false, false).mergeSchemaGetRenamed();
     Schema mergedAvroSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(), 
readerSchema.getFullName());
     assert mergedAvroSchema.equals(readerSchema);
@@ -297,32 +368,63 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       return newer;
     }
 
-    Option<Pair<HoodieRecord, Schema>> mergedRecord;
     if (enablePartialMerging) {
-      mergedRecord = recordMerger.partialMerge(
+      // TODO(HUDI-7843): decouple the merging logic from the merger
+      //  and use the record merge mode to control how to merge partial updates
+      Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.partialMerge(
           readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
           readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA),
           readerSchema, payloadProps);
-    } else {
-      mergedRecord = recordMerger.merge(
-          readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
-          readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
-    }
 
-    if (mergedRecord.isPresent()
-        && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
-      if (!mergedRecord.get().getRight().equals(readerSchema)) {
-        return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+      if (mergedRecord.isPresent()
+          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
+        if (!mergedRecord.get().getRight().equals(readerSchema)) {
+          return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+        }
+        return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+      }
+      return Option.empty();
+    } else {
+      switch (recordMergeMode) {
+        case OVERWRITE_WITH_LATEST:
+          return newer;
+        case EVENT_TIME_ORDERING:
+          Comparable oldOrderingValue = readerContext.getOrderingValue(
+              older, olderInfoMap, readerSchema, payloadProps);
+          if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
+            return newer;
+          }
+          Comparable newOrderingValue = readerContext.getOrderingValue(
+              newer, newerInfoMap, readerSchema, payloadProps);
+          if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
+            return Option.empty();
+          }
+          if (compareTo(oldOrderingValue, newOrderingValue) > 0) {
+            return older;
+          }
+          return newer;
+        case CUSTOM:
+        default:
+          Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
+              readerContext.constructHoodieRecord(older, olderInfoMap), 
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
+              readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+
+          if (mergedRecord.isPresent()
+              && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
+            if (!mergedRecord.get().getRight().equals(readerSchema)) {
+              return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+            }
+            return Option.ofNullable((T) 
mergedRecord.get().getLeft().getData());
+          }
+          return Option.empty();
       }
-      return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
     }
-    return Option.empty();
   }
 
   /**
    * Filter a record for downstream processing when:
-   *  1. A set of pre-specified keys exists.
-   *  2. The key of the record is not contained in the set.
+   * 1. A set of pre-specified keys exists.
+   * 2. The key of the record is not contained in the set.
    */
   protected boolean shouldSkip(T record, String keyFieldName, boolean 
isFullKey, Set<String> keys, Schema writerSchema) {
     String recordKey = readerContext.getValue(record, writerSchema, 
keyFieldName).toString();
@@ -419,4 +521,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         .orElseGet(dataBlock::getSchema);
     return Pair.of(transformer, evolvedSchema);
   }
+
+  private boolean isDeleteRecordWithNaturalOrder(Option<T> rowOption,
+                                                 Comparable orderingValue) {
+    return rowOption.isEmpty() && orderingValue.equals(0);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 396da4166a7..8661a91a12f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -19,7 +19,9 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.BaseFile;
@@ -46,11 +48,13 @@ import org.apache.avro.Schema;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
 /**
  * A file group reader that iterates through the records in a single file 
group.
@@ -73,6 +77,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private final long length;
   // Core structure to store and process records.
   private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+  private final RecordMergeMode recordMergeMode;
   private ClosableIterator<T> baseFileIterator;
   private final HoodieRecordMerger recordMerger;
   private final Option<UnaryOperator<T>> outputConverter;
@@ -102,6 +107,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.props = props;
     this.start = start;
     this.length = length;
+    this.recordMergeMode = getRecordMergeMode(props);
     this.recordMerger = 
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
     readerContext.setRecordMerger(this.recordMerger);
     readerContext.setTablePath(tablePath);
@@ -154,11 +160,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
 
   private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) throws IOException {
     BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
-    Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
readerContext.getSchemaHandler().getBootstrapRequiredFields();
-    Pair<List<Schema.Field>,List<Schema.Field>> allFields = 
readerContext.getSchemaHandler().getBootstrapDataFields();
-    Option<Pair<ClosableIterator<T>,Schema>> dataFileIterator =
+    Pair<List<Schema.Field>, List<Schema.Field>> requiredFields = 
readerContext.getSchemaHandler().getBootstrapRequiredFields();
+    Pair<List<Schema.Field>, List<Schema.Field>> allFields = 
readerContext.getSchemaHandler().getBootstrapDataFields();
+    Option<Pair<ClosableIterator<T>, Schema>> dataFileIterator =
         makeBootstrapBaseFileIteratorHelper(requiredFields.getRight(), 
allFields.getRight(), dataFile);
-    Option<Pair<ClosableIterator<T>,Schema>> skeletonFileIterator =
+    Option<Pair<ClosableIterator<T>, Schema>> skeletonFileIterator =
         makeBootstrapBaseFileIteratorHelper(requiredFields.getLeft(), 
allFields.getLeft(), baseFile);
     if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
       throw new IllegalStateException("should not be here if only partition 
cols are required");
@@ -180,9 +186,9 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
    * @param file           file to be read
    * @return pair of the record iterator of the file, and the schema of the 
data being read
    */
-  private Option<Pair<ClosableIterator<T>,Schema>> 
makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields,
-                                                                               
        List<Schema.Field> allFields,
-                                                                               
        BaseFile file) throws IOException {
+  private Option<Pair<ClosableIterator<T>, Schema>> 
makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields,
+                                                                               
         List<Schema.Field> allFields,
+                                                                               
         BaseFile file) throws IOException {
     if (requiredFields.isEmpty()) {
       return Option.empty();
     }
@@ -225,6 +231,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         .withPartition(getRelativePartitionPath(
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordMerger(recordMerger)
+        .withRecordMergeMode(recordMergeMode)
         .withRecordBuffer(recordBuffer)
         .build();
     logRecordReader.close();
@@ -244,6 +251,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     return new HoodieFileGroupReaderIterator<>(this);
   }
 
+  public static RecordMergeMode getRecordMergeMode(Properties props) {
+    String mergeMode = getStringWithAltKeys(props, 
HoodieCommonConfig.RECORD_MERGE_MODE, true).toUpperCase();
+    return RecordMergeMode.valueOf(mergeMode);
+  }
+
   public static class HoodieFileGroupReaderIterator<T> implements 
ClosableIterator<T> {
     private HoodieFileGroupReader<T> reader;
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a8a95887c18..9f3f8acf81c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.read;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -42,22 +43,28 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.compareTo;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * Tests {@link HoodieFileGroupReader} with different engines
@@ -80,9 +87,55 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                   Schema schema,
                                                   String fileGroupId);
 
+  public abstract Comparable getComparableUTF8String(String value);
+
+  @Test
+  public void testCompareToComparable() {
+    // Test same type
+    assertEquals(1, compareTo(Boolean.TRUE, Boolean.FALSE));
+    assertEquals(0, compareTo(Boolean.TRUE, Boolean.TRUE));
+    assertEquals(-1, compareTo(Boolean.FALSE, Boolean.TRUE));
+    assertEquals(1, compareTo(20, 15));
+    assertEquals(0, compareTo(15, 15));
+    assertEquals(-1, compareTo(10, 15));
+    assertEquals(1, compareTo(1.1f, 1.0f));
+    assertEquals(0, compareTo(1.0f, 1.0f));
+    assertEquals(-1, compareTo(0.9f, 1.0f));
+    assertEquals(1, compareTo(1.1, 1.0));
+    assertEquals(0, compareTo(1.0, 1.0));
+    assertEquals(-1, compareTo(0.9, 1.0));
+    assertEquals(1, compareTo("value2", "value1"));
+    assertEquals(0, compareTo("value1", "value1"));
+    assertEquals(-1, compareTo("value1", "value2"));
+    // Test different types which are comparable
+    assertEquals(1, compareTo(Long.MAX_VALUE / 2L, 10));
+    assertEquals(1, compareTo(20, 10L));
+    assertEquals(0, compareTo(10L, 10));
+    assertEquals(0, compareTo(10, 10L));
+    assertEquals(-1, compareTo(10, Long.MAX_VALUE));
+    assertEquals(-1, compareTo(10L, 20));
+    assertEquals(1, compareTo(getComparableUTF8String("value2"), "value1"));
+    assertEquals(1, compareTo("value2", getComparableUTF8String("value1")));
+    assertEquals(0, compareTo(getComparableUTF8String("value1"), "value1"));
+    assertEquals(0, compareTo("value1", getComparableUTF8String("value1")));
+    assertEquals(-1, compareTo(getComparableUTF8String("value1"), "value2"));
+    assertEquals(-1, compareTo("value1", getComparableUTF8String("value2")));
+  }
+
+  private static Stream<Arguments> testArguments() {
+    return Stream.of(
+        arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"),
+        arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "parquet"),
+        arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro"),
+        arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet"),
+        arguments(RecordMergeMode.CUSTOM, "avro"),
+        arguments(RecordMergeMode.CUSTOM, "parquet")
+    );
+  }
+
   @ParameterizedTest
-  @ValueSource(strings = {"avro", "parquet"})
-  public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat) 
throws Exception {
+  @MethodSource("testArguments")
+  public void testReadFileGroupInMergeOnReadTable(RecordMergeMode 
recordMergeMode, String logDataBlockFormat) throws Exception {
     Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
 
@@ -90,23 +143,23 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // One commit; reading one file group containing a base file only
       commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
0);
+          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
0, recordMergeMode);
 
       // Two commits; reading one file group containing a base file and a log 
file
       commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), 
UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
1);
+          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
1, recordMergeMode);
 
       // Three commits; reading one file group containing a base file and two 
log files
       commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)), 
UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
2);
+          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
2, recordMergeMode);
     }
   }
 
   @ParameterizedTest
-  @ValueSource(strings = {"avro", "parquet"})
-  public void testReadLogFilesOnlyInMergeOnReadTable(String 
logDataBlockFormat) throws Exception {
+  @MethodSource("testArguments")
+  public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode 
recordMergeMode, String logDataBlockFormat) throws Exception {
     Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
     // Use InMemoryIndex to generate log only mor table
@@ -116,12 +169,12 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       // One commit; reading one file group containing a base file only
       commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
1);
+          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
1, recordMergeMode);
 
       // Two commits; reading one file group containing a base file and a log 
file
       commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), 
UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
2);
+          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
2, recordMergeMode);
     }
   }
 
@@ -145,7 +198,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                  String tablePath,
                                                  String[] partitionPaths,
                                                  boolean containsBaseFile,
-                                                 int expectedLogFileNum) 
throws Exception {
+                                                 int expectedLogFileNum,
+                                                 RecordMergeMode 
recordMergeMode) throws Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
     Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
     HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(storageConf);
@@ -165,6 +219,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
     props.setProperty("hoodie.payload.ordering.field", "timestamp");
     props.setProperty(RECORD_MERGER_STRATEGY.key(), 
RECORD_MERGER_STRATEGY.defaultValue());
+    props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
     if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
       props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
     }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index b4e2fca80d3..60358872fc8 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -18,6 +18,12 @@
 
 package org.apache.hudi.common.table;
 
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.RecordPayloadType;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -28,16 +34,24 @@ import org.apache.hudi.common.util.Option;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /**
  * Tests hoodie table meta client {@link HoodieTableMetaClient}.
@@ -109,6 +123,136 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
         "Commit value should be \"test-detail\"");
   }
 
+  private static Stream<Arguments> argumentsForInferringRecordMergeMode() {
+    Stream<Arguments> arguments = Stream.of(
+        // Record merger strategy is not set
+        // Payload class is set, payload type is not set
+        arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+            Option.empty(), Option.empty(), 
RecordMergeMode.OVERWRITE_WITH_LATEST),
+        arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+            Option.empty(), Option.empty(), 
RecordMergeMode.EVENT_TIME_ORDERING),
+        arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+            Option.empty(), Option.empty(), RecordMergeMode.CUSTOM),
+        // Record merger strategy is not set
+        // Payload class is set, payload type is set; payload class takes 
precedence
+        arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+            Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.empty(), RecordMergeMode.OVERWRITE_WITH_LATEST),
+        arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+            Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.empty(), RecordMergeMode.EVENT_TIME_ORDERING),
+        arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+            Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.empty(), RecordMergeMode.CUSTOM),
+        // Record merger strategy is set to default
+        // Payload class is set, payload type is not set
+        arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+            Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.OVERWRITE_WITH_LATEST),
+        arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+            Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.EVENT_TIME_ORDERING),
+        arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+            Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.CUSTOM),
+        // Record merger strategy is set to default
+        // Payload class is not set, payload type is set
+        arguments(Option.empty(), 
Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.OVERWRITE_WITH_LATEST),
+        arguments(Option.empty(), 
Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.EVENT_TIME_ORDERING),
+        arguments(Option.empty(), 
Option.of(RecordPayloadType.HOODIE_METADATA.name()),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.CUSTOM),
+        // Record merger strategy is set to default
+        // Payload class or payload type is not set
+        arguments(Option.empty(), Option.empty(), 
Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue())),
+        // Record merger strategy is set to custom
+        arguments(Option.empty(), Option.empty(), 
Option.of("custom_merge_strategy"),
+            RecordMergeMode.CUSTOM),
+        arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+            Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.of("custom_merge_strategy"), RecordMergeMode.CUSTOM)
+    );
+    return arguments;
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForInferringRecordMergeMode")
+  public void testInferRecordMergeMode(Option<String> payloadClassName,
+                                       Option<String> payloadType,
+                                       Option<String> recordMergerStrategy,
+                                       RecordMergeMode 
expectedRecordMergeMode) {
+    HoodieTableMetaClient.PropertyBuilder builder = 
HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.MERGE_ON_READ.name())
+        .setTableName("table_name");
+    if (payloadClassName.isPresent()) {
+      builder.setPayloadClassName(payloadClassName.get());
+    }
+    if (payloadType.isPresent()) {
+      builder.setPayloadType(payloadType.get());
+    }
+    if (recordMergerStrategy.isPresent()) {
+      builder.setRecordMergerStrategy(recordMergerStrategy.get());
+    }
+    assertEquals(expectedRecordMergeMode,
+        
RecordMergeMode.valueOf(builder.build().getProperty(RECORD_MERGE_MODE.key())));
+  }
+
+  private static Stream<Arguments> 
argumentsForValidationFailureOnMergeConfigs() {
+    Stream<Arguments> arguments = Stream.of(
+        arguments(Option.of(DefaultHoodieRecordPayload.class.getName()), 
Option.empty(),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.OVERWRITE_WITH_LATEST,
+            "Payload class name 
(org.apache.hudi.common.model.DefaultHoodieRecordPayload) or type "
+                + "(null) should be consistent with the record merge mode 
OVERWRITE_WITH_LATEST"),
+        arguments(Option.empty(), 
Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.OVERWRITE_WITH_LATEST,
+            "Payload class name (null) or type (HOODIE_AVRO_DEFAULT) "
+                + "should be consistent with the record merge mode 
OVERWRITE_WITH_LATEST"),
+        arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()), 
Option.empty(),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.EVENT_TIME_ORDERING,
+            "Payload class name 
(org.apache.hudi.common.model.OverwriteWithLatestAvroPayload) or type "
+                + "(null) should be consistent with the record merge mode 
EVENT_TIME_ORDERING"),
+        arguments(Option.empty(), 
Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+            Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+            RecordMergeMode.EVENT_TIME_ORDERING,
+            "Payload class name (null) or type (OVERWRITE_LATEST_AVRO) "
+                + "should be consistent with the record merge mode 
EVENT_TIME_ORDERING")
+    );
+    return arguments;
+  }
+
+  @ParameterizedTest
+  @MethodSource("argumentsForValidationFailureOnMergeConfigs")
+  public void testValidationFailureOnMergeConfigs(Option<String> 
payloadClassName,
+                                                  Option<String> payloadType,
+                                                  Option<String> 
recordMergerStrategy,
+                                                  RecordMergeMode 
recordMergeMode,
+                                                  String expectedErrorMessage) 
{
+    HoodieTableMetaClient.PropertyBuilder builder = 
HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.MERGE_ON_READ.name())
+        .setTableName("table_name")
+        .setRecordMergeMode(recordMergeMode);
+    if (payloadClassName.isPresent()) {
+      builder.setPayloadClassName(payloadClassName.get());
+    }
+    if (payloadType.isPresent()) {
+      builder.setPayloadType(payloadType.get());
+    }
+    if (recordMergerStrategy.isPresent()) {
+      builder.setRecordMergerStrategy(recordMergerStrategy.get());
+    }
+    IllegalArgumentException exception = assertThrows(
+        IllegalArgumentException.class, builder::build);
+    assertEquals(expectedErrorMessage, exception.getMessage());
+  }
+
   @Test
   public void testEquals() throws IOException {
     HoodieTableMetaClient metaClient1 = 
HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), getTableType());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 3e80d4bee56..4ec1c0556b0 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -58,6 +60,8 @@ public class TestCustomMerger extends 
HoodieFileGroupReaderTestHarness {
     readerContext = new HoodieTestReaderContext(
         Option.of(new CustomAvroMerger()),
         Option.of(HoodieRecordTestPayload.class.getName()));
+    properties.setProperty(
+        HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name());
 
     // -------------------------------------------------------------
     // The test logic is as follows:
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index bf0fac19c67..3b3fc3c4359 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.common.table.read;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
@@ -55,6 +57,8 @@ public class TestEventTimeMerging extends 
HoodieFileGroupReaderTestHarness {
     readerContext = new HoodieTestReaderContext(
         Option.of(merger),
         Option.of(HoodieRecordTestPayload.class.getName()));
+    properties.setProperty(
+        HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.EVENT_TIME_ORDERING.name());
 
     // -------------------------------------------------------------
     // The test logic is as follows:
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index e59e65bea3e..f61db4ee247 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -19,7 +19,9 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
@@ -112,13 +114,15 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new 
HoodieSparkRecordMerger());
     ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, 
avroSchema, avroSchema,
         Option.empty(), metaClient.getTableConfig()));
+    TypedProperties props = new TypedProperties();
+    props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.CUSTOM.name());
     buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
         ctx,
         metaClient,
         partitionNameOpt,
         partitionFields,
         ctx.getRecordMerger(),
-        new TypedProperties(),
+        props,
         1024 * 1024 * 1000,
         metaClient.getTempFolderPath(),
         ExternalSpillableMap.DiskMapType.ROCKS_DB,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 747fcb9a2eb..e20104858b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,8 +19,6 @@
 
 package org.apache.hudi.common.table.read
 
-import org.apache.avro.Schema
-import org.apache.hadoop.conf.Configuration
 import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -28,14 +26,19 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hudi.{HoodieSparkRecordMerger, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, 
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import java.util
+
 import scala.collection.JavaConverters._
 
 /**
@@ -114,4 +117,8 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     assertEquals(0, expectedDf.except(actualDf).count())
     assertEquals(0, actualDf.except(expectedDf).count())
   }
+
+  override def getComparableUTF8String(value: String): Comparable[_] = {
+    UTF8String.fromString(value)
+  }
 }

Reply via email to