This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 27a950ee223 [HUDI-8850] Fix record merge mode related issues and 
improve test coverage in Spark SQL (#12725)
27a950ee223 is described below

commit 27a950ee22320d7b447fd3e31693ae9290dd5842
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jan 30 02:07:55 2025 -0800

    [HUDI-8850] Fix record merge mode related issues and improve test coverage 
in Spark SQL (#12725)
    
    This PR fixes record merge mode issues and improves test coverage in Spark 
SQL:
    
    - Table version 6 lacks record merge mode in the config. The file group 
reader now infers it if missing, enabling v6 table reads.
    - For INSERT INTO in Spark SQL, buildHoodieInsertConfig didn't set record 
merge mode properly. The logic is now removed.
    - The SQL writer now gets the correct record merge mode from the table 
config automatically.
    - In MERGE INTO in Spark SQL, the record merge mode check is fixed to avoid 
NPE.
    - More tests are added in TestMergeModeCommitTimeOrdering and 
TestMergeModeEventTimeOrdering for different merge modes. The tests cover both 
table version 6 and 8.
---
 .../common/table/read/HoodieFileGroupReader.java   |  16 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  18 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  13 +-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  33 +-
 .../spark/sql/hudi/dml/TestMergeIntoTable2.scala   |   3 +-
 .../hudi/dml/TestMergeModeCommitTimeOrdering.scala | 349 +++++++++++++--------
 .../hudi/dml/TestMergeModeEventTimeOrdering.scala  | 263 ++++++++++------
 7 files changed, 448 insertions(+), 247 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index bf9653efbf3..e77bded8a77 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
@@ -102,9 +104,17 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.start = start;
     this.length = length;
     HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+    RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
+    String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
+    if 
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(
+          recordMergeMode, tableConfig.getPayloadClass(),
+          mergeStrategyId, null);
+      recordMergeMode = triple.getLeft();
+      mergeStrategyId = triple.getRight();
+    }
     readerContext.setRecordMerger(readerContext.getRecordMerger(
-        tableConfig.getRecordMergeMode(),
-        tableConfig.getRecordMergeStrategyId(),
+        recordMergeMode, mergeStrategyId,
         props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
             
props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, ""))));
     readerContext.setTablePath(tablePath);
@@ -122,7 +132,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
-        tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, 
this.logFiles.isEmpty(),
+        recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
         isSkipMerge, shouldUseRecordPosition, readStats);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7de0617b23b..d7de38e9de9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, 
HoodieFileIndex}
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, RecordMergeMode, TypedProperties}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, WriteOperationType}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, TypedProperties}
+import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
 import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
@@ -257,21 +257,7 @@ trait ProvidesHoodieConfig extends Logging {
         Map()
     }
 
-    val deducedPayloadClassName = 
classOf[DefaultHoodieRecordPayload].getCanonicalName
-    val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
-    val recordMergeStrategy = 
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
-
-    if 
(tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName)
 &&
-      
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
-      tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
-      tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
-      tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
-    }
-
     val defaultOpts = Map(
-      DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName,
-      DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode,
-      DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key() -> 
recordMergeStrategy,
       // NOTE: By default insert would try to do deduplication in case that 
pre-combine column is specified
       //       for the table
       HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> 
String.valueOf(combineBeforeInsert),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f90fbef8e82..d6545911c1b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, 
WRITE_PARTIAL_UPDATE_SCHEMA}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
 import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.util.JFunction.scalaFunction1Noop
+
 import org.apache.avro.Schema
 import org.apache.spark.sql._
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast, 
attributeEquals}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals, 
MatchCast}
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
@@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, 
getPartitionPathFieldWriteConfig}
 import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
-import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
 encodeAsBase64String, stripCasting, toStructType, userGuideString, 
validateTargetTableAttrExistsInAssignments}
+import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String,
 stripCasting, toStructType, userGuideString, 
validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
 import 
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
 
 import java.util.Base64
+
 import scala.collection.JavaConverters._
 
 /**
@@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // Precombine field and record key field must be present in the assignment 
clause of all insert actions for event time ordering mode.
     // Check has no effect if we don't have such fields in target table or we 
don't have insert actions
     // Please note we are relying on merge mode in the table config as writer 
merge mode is always "CUSTOM" for MIT.
-    if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, 
Object]], HoodieTableConfig.RECORD_MERGE_MODE)
-          .equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) {
+    if (RecordMergeMode.EVENT_TIME_ORDERING.name()
+      
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String, 
Object]],
+        HoodieTableConfig.RECORD_MERGE_MODE))) {
       insertActions.foreach(action =>
         hoodieCatalogTable.preCombineKey.foreach(
           field => {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 16187ba3fd9..0df0f89922b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.storage.HoodieStorage
 import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, 
getSparkConfForTest}
 
 import org.apache.hadoop.fs.Path
@@ -37,6 +39,7 @@ import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
 import org.scalactic.source
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
 import org.slf4j.LoggerFactory
@@ -323,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 
-  protected def withSparkSqlSessionConfig(configNameValues: (String, 
String)*)(f: => Unit): Unit = {
+  protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
+                                         )(f: => Unit): Unit = {
+    withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e, 
true)): _*)(f)
+  }
+
+  protected def withSparkSqlSessionConfigWithCondition(configNameValues: 
((String, String), Boolean)*
+                                                      )(f: => Unit): Unit = {
     try {
-      configNameValues.foreach { case (configName, configValue) =>
-        spark.sql(s"set $configName=$configValue")
+      configNameValues.foreach { case ((configName, configValue), condition) =>
+        if (condition) {
+          spark.sql(s"set $configName=$configValue")
+        }
       }
       f
     } finally {
-      configNameValues.foreach { case (configName, _) =>
+      configNameValues.foreach { case ((configName, configValue), condition) =>
         spark.sql(s"reset $configName")
       }
     }
@@ -375,7 +386,19 @@ object HoodieSparkSqlTestBase {
       .getActiveTimeline.getInstantDetails(cleanInstant).get)
   }
 
+  def validateTableConfig(storage: HoodieStorage,
+                          basePath: String,
+                          expectedConfigs: Map[String, String],
+                          nonExistentConfigs: Seq[String]): Unit = {
+    val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
+    expectedConfigs.foreach(e => {
+      assertEquals(e._2, tableConfig.getString(e._1),
+        s"Table config ${e._1} should be ${e._2} but is 
${tableConfig.getString(e._1)}")
+    })
+    nonExistentConfigs.foreach(e => assertFalse(
+      tableConfig.contains(e), s"$e should not be present in the table 
config"))
+  }
+
   private def checkMessageContains(e: Throwable, text: String): Boolean =
     e.getMessage.trim.contains(text.trim)
-
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index 1e504fecd0c..bd8f7676e02 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -24,7 +24,6 @@ import 
org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-
 import org.slf4j.LoggerFactory
 
 class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
@@ -1254,7 +1253,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
                |  ts BIGINT
                |) using hudi
                |TBLPROPERTIES (
-               |  type = 'cow',
+               |  type = '$tableType',
                |  primaryKey = 'id',
                |  preCombineField = 'ts',
                |  recordMergeMode = '$recordMergeMode'
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index a0af78797e6..67e7d992b19 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -17,15 +17,76 @@
 
 package org.apache.spark.sql.hudi.dml
 
+import org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING
+import 
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
+
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig
 
 class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
 
-  Seq("mor").foreach { tableType =>
-    // [HUDI-8850] For COW commit time ordering does not work.
-    // Seq("cow", "mor").foreach { tableType =>
-    test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") {
-      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0"
+  // TODO(HUDI-8938): add "mor,true,true,6" after the fix
+  Seq(
+    "cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
+    "cow,6,true,false", "cow,6,true,true",
+    "mor,8,false,false", "mor,8,false,true", "mor,8,true,false",
+    "mor,6,true,false").foreach { args =>
+    val argList = args.split(',')
+    val tableType = argList(0)
+    val tableVersion = argList(1)
+    val setRecordMergeConfigs = argList(2).toBoolean
+    val setUpsertOperation = argList(3).toBoolean
+    val isUpsert = setUpsertOperation || (tableVersion.toInt != 6 && 
setRecordMergeConfigs)
+    val storage = HoodieTestUtils.getDefaultStorage
+    val mergeConfigClause = if (setRecordMergeConfigs) {
+      // with precombine field set, UPSERT operation is used automatically
+      if (tableVersion.toInt == 6) {
+        // Table version 6
+        s", payloadClass = 
'${classOf[OverwriteWithLatestAvroPayload].getName}'"
+      } else {
+        // Current table version (8)
+        ", preCombineField = 'ts',\nhoodie.record.merge.mode = 
'COMMIT_TIME_ORDERING'"
+      }
+    } else {
+      // By default, the COMMIT_TIME_ORDERING is used if not specified by the 
user
+      ""
+    }
+    val writeTableVersionClause = if (tableVersion.toInt == 6) {
+      s"hoodie.write.table.version = $tableVersion,"
+    } else {
+      ""
+    }
+    val expectedMergeConfigs = if (tableVersion.toInt == 6) {
+      Map(
+        HoodieTableConfig.VERSION.key -> "6",
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName)
+    } else {
+      Map(
+        HoodieTableConfig.VERSION.key -> "8",
+        HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName,
+        HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+    }
+    val nonExistentConfigs = if (tableVersion.toInt == 6) {
+      Seq(HoodieTableConfig.RECORD_MERGE_MODE.key, 
HoodieTableConfig.PRECOMBINE_FIELD.key)
+    } else {
+      if (setRecordMergeConfigs) {
+        Seq()
+      } else {
+        Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
+      }
+    }
+
+    test(s"Test $tableType table with COMMIT_TIME_ORDERING 
(tableVersion=$tableVersion,"
+      + 
s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)")
 {
+      withSparkSqlSessionConfigWithCondition(
+        ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+        ("hoodie.spark.sql.insert.into.operation" -> "upsert", 
setUpsertOperation),
+        // TODO(HUDI-8820): enable MDT after supporting MDT with table version 
6
+        ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
       ) {
         withRecordType()(withTempDir { tmp =>
           val tableName = generateTableName
@@ -39,14 +100,15 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
                |  ts long
                | ) using hudi
                | tblproperties (
+               |  $writeTableVersionClause
                |  type = '$tableType',
-               |  primaryKey = 'id',
-               |  preCombineField = 'ts',
-               |  hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+               |  primaryKey = 'id'
+               |  $mergeConfigClause
                | )
                | location '${tmp.getCanonicalPath}'
              """.stripMargin)
-
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           // Insert initial records with ts=100
           spark.sql(
             s"""
@@ -59,99 +121,119 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
           // Verify inserting records with the same ts value are visible 
(COMMIT_TIME_ORDERING)
           spark.sql(
             s"""
-              | insert into $tableName
-              | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
-              | union all
-              | select 2, 'B_equal', 70.0, 100
+               | insert into $tableName
+               | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+               | union all
+               | select 2, 'B_equal', 70.0, 100
             """.stripMargin)
-
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A_equal", 60.0, 100),
-            Seq(2, "B_equal", 70.0, 100)
-          )
+            (if (isUpsert) {
+              // With UPSERT operation, there is no duplicate
+              Seq(
+                Seq(1, "A_equal", 60.0, 100),
+                Seq(2, "B_equal", 70.0, 100))
+            } else {
+              // With INSERT operation, there are duplicates
+              Seq(
+                Seq(1, "A", 10.0, 100),
+                Seq(1, "A_equal", 60.0, 100),
+                Seq(2, "B", 20.0, 100),
+                Seq(2, "B_equal", 70.0, 100))
+            }): _*)
 
-          // Verify updating records with the same ts value are visible 
(COMMIT_TIME_ORDERING)
-          spark.sql(
-            s"""
-               | update $tableName
-               | set price = 50.0, ts = 100
-               | where id = 1
+          if (isUpsert) {
+            // Verify updating records with the same ts value are visible 
(COMMIT_TIME_ORDERING)
+            spark.sql(
+              s"""
+                 | update $tableName
+                 | set price = 50.0, ts = 100
+                 | where id = 1
              """.stripMargin)
+            validateTableConfig(
+              storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(1, "A_equal", 50.0, 100),
+              Seq(2, "B_equal", 70.0, 100))
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A_equal", 50.0, 100),
-            Seq(2, "B_equal", 70.0, 100)
-          )
-
-          // Verify inserting records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
-          spark.sql(
-            s"""
-               | insert into $tableName
-               | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
-               | union all
-               | select 2, 'B', 40.0, 99
+            // Verify inserting records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
+            spark.sql(
+              s"""
+                 | insert into $tableName
+                 | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+                 | union all
+                 | select 2, 'B', 40.0, 99
              """.stripMargin)
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A", 30.0, 99),
-            Seq(2, "B", 40.0, 99)
-          )
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(1, "A", 30.0, 99),
+              Seq(2, "B", 40.0, 99))
 
-          // Verify updating records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
-          spark.sql(
-            s"""
-               | update $tableName
-               | set price = 50.0, ts = 98
-               | where id = 1
+            // Verify updating records with a lower ts value are visible 
(COMMIT_TIME_ORDERING)
+            spark.sql(
+              s"""
+                 | update $tableName
+                 | set price = 50.0, ts = 98
+                 | where id = 1
              """.stripMargin)
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A", 50.0, 98),
-            Seq(2, "B", 40.0, 99)
-          )
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(1, "A", 50.0, 98),
+              Seq(2, "B", 40.0, 99))
 
-          // Verify inserting records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
-          spark.sql(
-            s"""
-               | insert into $tableName
-               | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
-               | union all
-               | select 2, 'B', 40.0, 101
+            // Verify inserting records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
+            spark.sql(
+              s"""
+                 | insert into $tableName
+                 | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+                 | union all
+                 | select 2, 'B', 40.0, 101
              """.stripMargin)
 
-          // Verify records with ts=101 are visible
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A", 30.0, 101),
-            Seq(2, "B", 40.0, 101)
-          )
+            // Verify records with ts=101 are visible
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(1, "A", 30.0, 101),
+              Seq(2, "B", 40.0, 101)
+            )
 
-          // Verify updating records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
-          spark.sql(
-            s"""
-               | update $tableName
-               | set price = 50.0, ts = 102
-               | where id = 1
+            // Verify updating records with a higher ts value are visible 
(COMMIT_TIME_ORDERING)
+            spark.sql(
+              s"""
+                 | update $tableName
+                 | set price = 50.0, ts = 102
+                 | where id = 1
              """.stripMargin)
 
-          // Verify final state after all operations
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "A", 50.0, 102),
-            Seq(2, "B", 40.0, 101)
-          )
-
-          // Delete record
-          spark.sql(s"delete from $tableName where id = 1")
+            // Verify final state after all operations
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(1, "A", 50.0, 102),
+              Seq(2, "B", 40.0, 101)
+            )
 
-          // Verify deletion
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(2, "B", 40.0, 101)
-          )
+            // Delete record
+            spark.sql(s"delete from $tableName where id = 1")
+            validateTableConfig(
+              storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
+            // Verify deletion
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 40.0, 101)
+            )
+          }
         })
       }
     }
 
-    test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType 
table") {
-      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {
+    // TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in 
MERGE INTO for COW
+    test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType 
table "
+      + 
s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs,"
+      + s"setUpsertOperation=$setUpsertOperation)") {
+      withSparkSqlSessionConfigWithCondition(
+        ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+        ("hoodie.spark.sql.insert.into.operation" -> "upsert", 
setUpsertOperation),
+        // TODO(HUDI-8820): enable MDT after supporting MDT with table version 
6
+        ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
+      ) {
         withRecordType()(withTempDir { tmp =>
           val tableName = generateTableName
           // Create table with COMMIT_TIME_ORDERING
@@ -164,82 +246,103 @@ class TestMergeModeCommitTimeOrdering extends 
HoodieSparkSqlTestBase {
                |  ts long
                | ) using hudi
                | tblproperties (
+               |  $writeTableVersionClause
                |  type = '$tableType',
-               |  primaryKey = 'id',
-               |  preCombineField = 'ts',
-               |  hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+               |  primaryKey = 'id'
+               |  $mergeConfigClause
                | )
                | location '${tmp.getCanonicalPath}'
-             """.stripMargin)
+           """.stripMargin)
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
 
           // Insert initial records
           spark.sql(
             s"""
                | insert into $tableName
-               | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union 
all
-               | select 0, 'X', 20.0, 100 union all
-               | select 2, 'B', 20.0, 100 union all
-               | select 3, 'C', 30.0, 100 union all
-               | select 4, 'D', 40.0, 100 union all
-               | select 5, 'E', 50.0, 100 union all
-               | select 6, 'F', 60.0, 100
-             """.stripMargin)
+               | select 1 as id, 'A' as name, 10.0 as price, 100L as ts union 
all
+               | select 0, 'X', 20.0, 100L union all
+               | select 2, 'B', 20.0, 100L union all
+               | select 3, 'C', 30.0, 100L union all
+               | select 4, 'D', 40.0, 100L union all
+               | select 5, 'E', 50.0, 100L union all
+               | select 6, 'F', 60.0, 100L
+           """.stripMargin)
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
 
+          // TODO(HUDI-8840): enable MERGE INTO with deletes
+          val shouldTestMergeIntoDelete = setRecordMergeConfigs && 
tableVersion.toInt == 8
           // Merge operation - delete with higher, lower and equal ordering 
field value, all should take effect.
-          spark.sql(
-            s"""
-               | merge into $tableName t
-               | using (
-               |   select 1 as id, 'B2' as name, 25.0 as price, 101 as ts 
union all
-               |   select 2, '', 55.0, 99 as ts union all
-               |   select 0, '', 55.0, 100 as ts
-               | ) s
-               | on t.id = s.id
-               | when matched then delete
-             """.stripMargin)
+          if (shouldTestMergeIntoDelete) {
+            spark.sql(
+              s"""
+                 | merge into $tableName t
+                 | using (
+                 |   select 1 as id, 'B2' as name, 25.0 as price, 101L as ts 
union all
+                 |   select 2, '', 55.0, 99L as ts union all
+                 |   select 0, '', 55.0, 100L as ts
+                 | ) s
+                 | on t.id = s.id
+                 | when matched then delete
+           """.stripMargin)
+          }
 
           // Merge operation - update with mixed ts values
           spark.sql(
             s"""
                | merge into $tableName t
                | using (
-               |   select 4 as id, 'D2' as name, 45.0 as price, 101 as ts 
union all
-               |   select 5, 'E2', 55.0, 99 as ts union all
-               |   select 6, 'F2', 65.0, 100 as ts
+               |   select 4 as id, 'D2' as name, 45.0 as price, 101L as ts 
union all
+               |   select 5, 'E2', 55.0, 99L as ts union all
+               |   select 6, 'F2', 65.0, 100L as ts
                | ) s
                | on t.id = s.id
                | when matched then update set *
-             """.stripMargin)
+           """.stripMargin)
 
           // Verify state after merges
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
+          val nonDeletedRows: Seq[Seq[Any]] = if (shouldTestMergeIntoDelete) {
+            Seq()
+          } else {
+            Seq(Seq(0, "X", 20.0, 100),
+              Seq(1, "A", 10.0, 100),
+              Seq(2, "B", 20.0, 100))
+          }
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(3, "C", 30.0, 100),
-            Seq(4, "D2", 45.0, 101),
-            Seq(5, "E2", 55.0, 99),
-            Seq(6, "F2", 65.0, 100)
-          )
+            (nonDeletedRows ++ Seq(
+              Seq(3, "C", 30.0, 100),
+              Seq(4, "D2", 45.0, 101),
+              Seq(5, "E2", 55.0, 99),
+              Seq(6, "F2", 65.0, 100)
+            )): _*)
 
           // Insert new records through merge
           spark.sql(
             s"""
                | merge into $tableName t
                | using (
-               |   select 7 as id, 'D2' as name, 45.0 as price, 100 as ts 
union all
-               |   select 8, 'E2', 55.0, 100 as ts
+               |   select 7 as id, 'D2' as name, 45.0 as price, 100L as ts 
union all
+               |   select 8, 'E2', 55.0, 100L as ts
                | ) s
                | on t.id = s.id
                | when not matched then insert *
-             """.stripMargin)
+           """.stripMargin)
 
           // Verify final state
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(3, "C", 30.0, 100),
-            Seq(4, "D2", 45.0, 101),
-            Seq(5, "E2", 55.0, 99),
-            Seq(6, "F2", 65.0, 100),
-            Seq(7, "D2", 45.0, 100),
-            Seq(8, "E2", 55.0, 100)
-          )
+            (nonDeletedRows ++ Seq(
+              Seq(3, "C", 30.0, 100),
+              Seq(4, "D2", 45.0, 101),
+              Seq(5, "E2", 55.0, 99),
+              Seq(6, "F2", 65.0, 100),
+              Seq(7, "D2", 45.0, 100),
+              Seq(8, "E2", 55.0, 100)
+            )): _*)
         })
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index f0ffbc3a38a..69a5c83d6ae 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -18,16 +18,68 @@
 package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import 
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
 
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig
 
 class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
 
-  Seq("cow", "mor").foreach { tableType =>
-    test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
-      withSparkSqlSessionConfig(
-        "hoodie.merge.small.file.group.candidates.limit" -> "0",
-        DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+  // TODO(HUDI-8938): add "mor,6,true", "mor,6,false" after the fix
+  Seq("cow,8,true", "cow,8,false", "cow,6,true", "cow,6,false",
+    "mor,8,true", "mor,8,false").foreach { args =>
+    val argList = args.split(',')
+    val tableType = argList(0)
+    val tableVersion = argList(1)
+    val setRecordMergeConfigs = argList(2).toBoolean
+    val storage = HoodieTestUtils.getDefaultStorage
+    val mergeConfigClause = if (setRecordMergeConfigs) {
+      if (tableVersion.toInt == 6) {
+        // Table version 6
+        s", payloadClass = '${classOf[DefaultHoodieRecordPayload].getName}'"
+      } else {
+        // Current table version (8)
+        ", hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'"
+      }
+    } else {
+      ""
+    }
+    val writeTableVersionClause = if (tableVersion.toInt == 6) {
+      s"hoodie.write.table.version = $tableVersion,"
+    } else {
+      ""
+    }
+    val expectedMergeConfigs = if (tableVersion.toInt == 6) {
+      Map(
+        HoodieTableConfig.VERSION.key -> "6",
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[DefaultHoodieRecordPayload].getName,
+        HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts"
+      )
+    } else {
+      Map(
+        HoodieTableConfig.VERSION.key -> "8",
+        HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts",
+        HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
+        HoodieTableConfig.PAYLOAD_CLASS_NAME.key -> 
classOf[DefaultHoodieRecordPayload].getName,
+        HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key -> 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+    }
+    val nonExistentConfigs = if (tableVersion.toInt == 6) {
+      Seq(HoodieTableConfig.RECORD_MERGE_MODE.key)
+    } else {
+      Seq()
+    }
+
+    test(s"Test $tableType table with EVENT_TIME_ORDERING 
(tableVersion=$tableVersion,"
+      + s"setRecordMergeConfigs=$setRecordMergeConfigs)") {
+      withSparkSqlSessionConfigWithCondition(
+        ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+        (DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> 
"true", true),
+        // TODO(HUDI-8820): enable MDT after supporting MDT with table version 
6
+        ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
       ) {
         withRecordType()(withTempDir { tmp =>
           val tableName = generateTableName
@@ -41,13 +93,16 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
                |  ts long
                | ) using hudi
                | tblproperties (
+               |  $writeTableVersionClause
                |  type = '$tableType',
                |  primaryKey = 'id',
-               |  preCombineField = 'ts',
-               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+               |  preCombineField = 'ts'
+               |  $mergeConfigClause
                | )
                | location '${tmp.getCanonicalPath}'
              """.stripMargin)
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
 
           // Insert initial records with ts=100
           spark.sql(
@@ -67,6 +122,8 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
                | select 2, 'B_equal', 70.0, 100
             """.stripMargin)
 
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
             Seq(1, "A_equal", 60.0, 100),
             Seq(2, "B_equal", 70.0, 100)
@@ -80,6 +137,8 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
                | where id = 1
              """.stripMargin)
 
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
             Seq(1, "A_equal", 50.0, 100),
             Seq(2, "B_equal", 70.0, 100)
@@ -94,6 +153,8 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
                | select 2, 'B', 40.0, 99
              """.stripMargin)
 
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
             Seq(1, "A_equal", 50.0, 100),
             Seq(2, "B_equal", 70.0, 100)
@@ -107,6 +168,8 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
                | where id = 1
              """.stripMargin)
 
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
             Seq(1, "A_equal", 50.0, 100),
             Seq(2, "B_equal", 70.0, 100)
@@ -164,110 +227,124 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
           // Delete record with no ts.
           spark.sql(s"delete from $tableName where id = 1")
           // Verify deletion
+          validateTableConfig(
+            storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
           checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
             Seq(2, "B", 40.0, 101)
           )
         })
       }
     }
-  }
 
-  Seq("mor").foreach { tableType =>
-    // [HUDI-8915]: COW MIT delete does not honor event time ordering. For 
update we have the coverage in
-    // "Test MergeInto with commit time/event time ordering coverage".
-    //  Seq("cow", "mor").foreach { tableType =>
-    test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType 
table") {
-      
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {
-        withRecordType()(withTempDir { tmp =>
-          val tableName = generateTableName
-          // Create table with EVENT_TIME_ORDERING
-          spark.sql(
-            s"""
-               | create table $tableName (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts long
-               | ) using hudi
-               | tblproperties (
-               |  type = '$tableType',
-               |  primaryKey = 'id',
-               |  preCombineField = 'ts',
-               |  hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
-               | )
-               | location '${tmp.getCanonicalPath}'
+    if ("mor".equals(tableType)) {
+      // [HUDI-8915]: COW MIT delete does not honor event time ordering. For 
update we have the coverage in
+      // "Test MergeInto with commit time/event time ordering coverage".
+      //  Seq("cow", "mor").foreach { tableType =>
+      test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType 
table "
+        + 
s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs)") {
+        withSparkSqlSessionConfigWithCondition(
+          ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+          // TODO(HUDI-8820): enable MDT after supporting MDT with table 
version 6
+          ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
+        ) {
+          withRecordType()(withTempDir { tmp =>
+            val tableName = generateTableName
+            // Create table with EVENT_TIME_ORDERING
+            spark.sql(
+              s"""
+                 | create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts long
+                 | ) using hudi
+                 | tblproperties (
+                 |  $writeTableVersionClause
+                 |  type = '$tableType',
+                 |  primaryKey = 'id',
+                 |  preCombineField = 'ts'
+                 |  $mergeConfigClause
+                 | )
+                 | location '${tmp.getCanonicalPath}'
              """.stripMargin)
+            validateTableConfig(
+              storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
 
-          // Insert initial records with ts=100
-          spark.sql(
-            s"""
-               | insert into $tableName
-               | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts union 
all
-               | select 1, 'A', 10.0, 100L union all
-               | select 2, 'B', 20.0, 100L union all
-               | select 3, 'C', 30.0, 100L union all
-               | select 4, 'D', 40.0, 100L union all
-               | select 5, 'E', 50.0, 100L union all
-               | select 6, 'F', 60.0, 100L
+            // Insert initial records with ts=100
+            spark.sql(
+              s"""
+                 | insert into $tableName
+                 | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts 
union all
+                 | select 1, 'A', 10.0, 100L union all
+                 | select 2, 'B', 20.0, 100L union all
+                 | select 3, 'C', 30.0, 100L union all
+                 | select 4, 'D', 40.0, 100L union all
+                 | select 5, 'E', 50.0, 100L union all
+                 | select 6, 'F', 60.0, 100L
              """.stripMargin)
 
-          // Merge operation - delete with arbitrary ts value (lower, equal 
and higher). Lower ts won't take effect.
-          spark.sql(
-            s"""
-               | merge into $tableName t
-               | using (
-               |   select 0 as id, 'B2' as name, 25.0 as price, 100L as ts 
union all
-               |   select 1 as id, 'B2' as name, 25.0 as price, 101L as ts 
union all
-               |   select 2 as id, 'B2' as name, 25.0 as price, 99L as ts
-               | ) s
-               | on t.id = s.id
-               | when matched then delete
+            // Merge operation - delete with arbitrary ts value (lower, equal 
and higher). Lower ts won't take effect.
+            spark.sql(
+              s"""
+                 | merge into $tableName t
+                 | using (
+                 |   select 0 as id, 'B2' as name, 25.0 as price, 100L as ts 
union all
+                 |   select 1 as id, 'B2' as name, 25.0 as price, 101L as ts 
union all
+                 |   select 2 as id, 'B2' as name, 25.0 as price, 99L as ts
+                 | ) s
+                 | on t.id = s.id
+                 | when matched then delete
              """.stripMargin)
 
-          // Merge operation - update with mixed ts values (only equal or 
higher ts should take effect)
-          spark.sql(
-            s"""
-               | merge into $tableName t
-               | using (
-               |   select 4 as id, 'D2' as name, 45.0 as price, 101L as ts 
union all
-               |   select 5, 'E2', 55.0, 99L as ts union all
-               |   select 6, 'F2', 65.0, 100L as ts
-               | ) s
-               | on t.id = s.id
-               | when matched then update set *
+            // Merge operation - update with mixed ts values (only equal or 
higher ts should take effect)
+            spark.sql(
+              s"""
+                 | merge into $tableName t
+                 | using (
+                 |   select 4 as id, 'D2' as name, 45.0 as price, 101L as ts 
union all
+                 |   select 5, 'E2', 55.0, 99L as ts union all
+                 |   select 6, 'F2', 65.0, 100L as ts
+                 | ) s
+                 | on t.id = s.id
+                 | when matched then update set *
              """.stripMargin)
 
-          // Verify state after merges
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(2, "B", 20.0, 100),
-            Seq(3, "C", 30.0, 100),
-            Seq(4, "D2", 45.0, 101),
-            Seq(5, "E", 50.0, 100),
-            Seq(6, "F2", 65.0, 100)
-          )
+            // Verify state after merges
+            validateTableConfig(
+              storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 20.0, 100),
+              Seq(3, "C", 30.0, 100),
+              Seq(4, "D2", 45.0, 101),
+              Seq(5, "E", 50.0, 100),
+              Seq(6, "F2", 65.0, 100)
+            )
 
-          // Insert new records through merge
-          spark.sql(
-            s"""
-               | merge into $tableName t
-               | using (
-               |   select 7 as id, 'G' as name, 70.0 as price, 99 as ts union 
all
-               |   select 8, 'H', 80.0, 99 as ts
-               | ) s
-               | on t.id = s.id
-               | when not matched then insert *
+            // Insert new records through merge
+            spark.sql(
+              s"""
+                 | merge into $tableName t
+                 | using (
+                 |   select 7 as id, 'G' as name, 70.0 as price, 99 as ts 
union all
+                 |   select 8, 'H', 80.0, 99 as ts
+                 | ) s
+                 | on t.id = s.id
+                 | when not matched then insert *
              """.stripMargin)
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(2, "B", 20.0, 100),
-            Seq(3, "C", 30.0, 100),
-            Seq(4, "D2", 45.0, 101),
-            Seq(5, "E", 50.0, 100),
-            Seq(6, "F2", 65.0, 100),
-            Seq(7, "G", 70.0, 99),
-            Seq(8, "H", 80.0, 99)
-          )
-        })
+            validateTableConfig(
+              storage, tmp.getCanonicalPath, expectedMergeConfigs, 
nonExistentConfigs)
+            checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
+              Seq(2, "B", 20.0, 100),
+              Seq(3, "C", 30.0, 100),
+              Seq(4, "D2", 45.0, 101),
+              Seq(5, "E", 50.0, 100),
+              Seq(6, "F2", 65.0, 100),
+              Seq(7, "G", 70.0, 99),
+              Seq(8, "H", 80.0, 99)
+            )
+          })
+        }
       }
     }
   }

Reply via email to