nsivabalan commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2274228608


##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -121,10 +132,35 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final String PARTIAL_UPDATE_CUSTOM_MARKER = 
"hoodie.write.partial.update.custom.marker";
   public static final String DEBEZIUM_UNAVAILABLE_VALUE = 
"__debezium_unavailable_value";
   // This prefix is used to set merging related properties.
-  // A reader might need to read some merger properties to function as 
expected,
+  // A reader might need to read some writer properties to function as 
expected,
   // and Hudi stores properties with this prefix so the reader parses these 
properties,
   // and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...) 
to use.
-  public static final String MERGE_PROPERTIES_PREFIX = 
"hoodie.table.merge.properties.";
+  public static final String MERGE_CUSTOM_PROPERTY_PREFIX = 
"hoodie.merge.custom.property.prefix.";

Review Comment:
   why changing the key name here. we don't need "prefix" as part of the key. 
   `hoodie.merge.custom.property` would suffice.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  public String getLegacyPayloadClass() {
+    return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+  }
+
   public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * which is based on the logic of table version < 9, and then tuned for 
version 9 logic.
+   * This approach fits the same behavior of upgrade from 8 to 9.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,

Review Comment:
   for future: 
   we might need to call this for table version 9 and above and not just 9. 
   also, we should only be calling if payload Class is not null. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  public String getLegacyPayloadClass() {
+    return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+  }
+
   public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * which is based on the logic of table version < 9, and then tuned for 
version 9 logic.
+   * This approach fits the same behavior of upgrade from 8 to 9.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   String 
orderingFieldName,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> reconciledConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return reconciledConfigs;
+    }
+
+    // Step 1: Infer merging configs based on input information.
+    // This step is important since it provides the same configs before we do 
table upgrade.
+    // Then additional logic for table version 9 could be verified.
+    Triple<RecordMergeMode, String, String> inferredConfigs = 
inferBasicMergingBehavior(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+    recordMergeMode = inferredConfigs.getLeft();
+    recordMergeStrategyId = inferredConfigs.getRight();
+
+    // Step 2: Handle Version 9 specific logic.
+    // CASE 0: For tables with special merger properties, e.g., with 
non-builtin mergers.
+    // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+    //   NOTE: Payload class should NOT be set for these cases.
+    if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+        || StringUtils.isNullOrEmpty(payloadClassName)) {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
recordMergeStrategyId);
+    } else {
+      // For tables using payload classes.
+      //   CASE 2: Custom payload class. We set these properties explicitly.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+        reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+        reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      } else { // CASE 3: Payload classes are under deprecation.
+        // Standard merging configs.
+        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
+        if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+        } else {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        }
+        // Partial update mode config.
+        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());

Review Comment:
   Note to self: I am not sure if we will go w/ entire 
https://github.com/apache/hudi/pull/13623. 
   so, atleast we need to pull in changes not related to unification like 
changing this ENUM naming. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1125,10 +1125,12 @@ class HoodieSparkSqlWriterInternal {
       HoodieTableVersion.fromVersionCode(
         SparkConfigUtils.getStringWithAltKeys(mergedParams, 
WRITE_TABLE_VERSION).toInt)
     }
+    // Handle merge properties.
     if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
       || 
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
-      val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+      // TODO: We should remove this inference since it anyways will be done 
when creating metaClient.

Review Comment:
   @linliu-code : why not clean it up right away ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java:
##########
@@ -152,10 +153,13 @@ public static <R> HoodieRecord<R> 
convertToHoodieRecordPayload(GenericRecord rec
       record = recordWithoutMetaFields;
     }
 
-    HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new 
HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
-        HoodieRecordUtils.loadPayload(payloadClazz, record, preCombineVal), 
operation);
-
-    return (HoodieRecord<R>) hoodieRecord;
+    if (StringUtils.isNullOrEmpty(payloadClazz)) {
+      return (HoodieRecord<R>) new HoodieAvroIndexedRecord(new 
HoodieKey(recKey, partitionPath), record);

Review Comment:
   yes, we need to fix this once COW merge handle migration is complete and all 
payload creation has been fixed. 
   CC @the-other-tim-brown 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -216,6 +217,34 @@ object HoodieWriterUtils {
     validateTableConfig(spark, params, tableConfig, false)
   }
 
+  /**
+   * This function adds specific rules to choose config key in table config 
for a given writer key.
+   *
+   * RULE 1: When
+   *   1. table version is 9,
+   *   2. writer key is a payload class key, and
+   *   3. table config has legacy payload class configured,
+   * then
+   *   return legacy payload class key.
+   *
+   * Basic rule:
+   *   return writer key.
+   */
+  def getKeyInTableConfig(key: String, tableConfig: HoodieConfig): String = {

Review Comment:
   yes, also, lets call this only incase of payload class config key and not 
for other cases.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  public String getLegacyPayloadClass() {
+    return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+  }
+
   public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * which is based on the logic of table version < 9, and then tuned for 
version 9 logic.
+   * This approach fits the same behavior of upgrade from 8 to 9.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   String 
orderingFieldName,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> reconciledConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return reconciledConfigs;
+    }
+
+    // Step 1: Infer merging configs based on input information.
+    // This step is important since it provides the same configs before we do 
table upgrade.
+    // Then additional logic for table version 9 could be verified.
+    Triple<RecordMergeMode, String, String> inferredConfigs = 
inferBasicMergingBehavior(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+    recordMergeMode = inferredConfigs.getLeft();
+    recordMergeStrategyId = inferredConfigs.getRight();
+
+    // Step 2: Handle Version 9 specific logic.
+    // CASE 0: For tables with special merger properties, e.g., with 
non-builtin mergers.
+    // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+    //   NOTE: Payload class should NOT be set for these cases.
+    if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+        || StringUtils.isNullOrEmpty(payloadClassName)) {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());

Review Comment:
   why are we trying to reconcile for cases which should not be touched. 
   for eg: in case of custom mergers, why even override any table props. we 
should just route the properties set by the user as is. 
   
   Or are there chances we do some inference logic and set these props even for 
custom mergers?



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -260,12 +260,12 @@ private void initRecordMerger(TypedProperties properties, 
boolean isIngestion) {
     HoodieTableVersion tableVersion = tableConfig.getTableVersion();
     // If the provided payload class differs from the table's payload class, 
we need to infer the correct merging behavior.
     if (isIngestion && providedPayloadClass.map(className -> 
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(), 
null,
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferBasicMergingBehavior(null, providedPayloadClass.get(), 
null,
           tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
       recordMergeMode = triple.getLeft();
       mergeStrategyId = triple.getRight();
     } else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferBasicMergingBehavior(

Review Comment:
   lets fix condition in L 267 to `< 8` instead of `!>=8` 



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -554,7 +554,7 @@ protected List<HoodieRecord<RawTripTestPayload>> 
dedupForCopyOnWriteStorage(Hood
         .getReaderContextFactoryForWrite(metaClient, 
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
     List<String> orderingFieldNames = getOrderingFieldNames(
         readerContext.getMergeMode(), writeClient.getConfig().getProps(), 
metaClient);
-    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
writeConfig.getPayloadClass(), null,
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferBasicMergingBehavior(null, 
writeConfig.getPayloadClass(), null,

Review Comment:
   not sure if the naming change is apt. can we leave the naming as is. 



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -260,12 +260,12 @@ private void initRecordMerger(TypedProperties properties, 
boolean isIngestion) {
     HoodieTableVersion tableVersion = tableConfig.getTableVersion();
     // If the provided payload class differs from the table's payload class, 
we need to infer the correct merging behavior.
     if (isIngestion && providedPayloadClass.map(className -> 
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
-      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(), 
null,
+      Triple<RecordMergeMode, String, String> triple = 
HoodieTableConfig.inferBasicMergingBehavior(null, providedPayloadClass.get(), 
null,

Review Comment:
   we should avoid calling `inferBasicMergingBehavior` for table version 9 
unless we have special circumstances like this. 
   So, if we plan to fix impl of `inferBasicMergingBehavior` to do early return 
for table version 9 or above, atleast we need to account for expresison payload 
use-cases. 
   
   something to be mindful of.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1858,16 +1858,18 @@ public void testPayloadClassUpdate() throws Exception {
         true, false, null, "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     assertRecordCount(1000, dataSetBasePath, sqlContext);
+    HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
dataSetBasePath, false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DefaultHoodieRecordPayload.class.getName());
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
-    //now assert that hoodie.properties file now has updated payload class name
-    HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, 
dataSetBasePath, false);
-    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DummyAvroPayload.class.getName());
+    // NOTE: Payload class cannot be updated, though the write can be executed 
using different payload classes in the runtime.
+    metaClient = HoodieTableMetaClient.reload(metaClient);

Review Comment:
   I don't think this is a valid use-case. 
   Infact, I am surprised how we have this loose ends w/ deltastreamer. we 
should not let users update payload class once table is created is the expected 
behavior. 
   we should chase the gap and fix it for HoodieStreamer. and then fix this 
test to asset for exception when payload class changes. 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -186,14 +187,18 @@ private static String[] 
getMandatoryFieldsForMerging(HoodieTableConfig cfg,
                                                        boolean 
hasBuiltInDelete,
                                                        Option<Pair<String, 
String>> customDeleteMarkerKeyAndValue,
                                                        boolean 
hasInstantRange) {
-    Triple<RecordMergeMode, String, String> mergingConfigs = 
HoodieTableConfig.inferCorrectMergingBehavior(
-        cfg.getRecordMergeMode(),
-        cfg.getPayloadClass(),
-        cfg.getRecordMergeStrategyId(),
-        cfg.getPreCombineFieldsStr().orElse(null),
-        cfg.getTableVersion());
-
-    if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
+    RecordMergeMode mergeMode = cfg.getRecordMergeMode();
+    if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {

Review Comment:
   if expression payload is involved, we might need to call into this even for 
tbl v9
   @the-other-tim-brown @lokeshj1703 : can you confirm. 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java:
##########
@@ -182,16 +184,32 @@ static String getPayloadClassName(HoodieConfig config) {
   }
 
   static String getPayloadClassName(Properties props) {
-    return 
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+    Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
+    if (payloadOpt.isPresent()) {
+      return payloadOpt.get();
+    }
+    // Note: starting from version 9, payload class is not necessary set, but
+    //       merge mode must exist. Therefore, we use merge mode to infer
+    //       the payload class for certain corner cases, like for MIT command.
+    if (props.containsKey(RECORD_MERGE_MODE.key())
+        && 
props.getProperty(RECORD_MERGE_MODE.key()).equals(RecordMergeMode.COMMIT_TIME_ORDERING.name()))
 {
+      return OverwriteWithLatestAvroPayload.class.getName();
+    }
+    return DEFAULT_PAYLOAD_CLASS_NAME;
   }
 
+  // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
+  // some temporary payload class setting is respect.
   static Option<String> getPayloadClassNameIfPresent(Properties props) {
     String payloadClassName = null;
     if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
       payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
+    } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {

Review Comment:
   we should be wary of returning even legacy payload when someone calls 
   `HoodieRecordPayload.getPayloadClassName(Properties props)` 
   
   legacy payload should only be used for downgrade code paths. During which, 
we can get entire list of table properties and look it up w/ explicit key. we 
don't need a direct getter from HoodieTableConfig. 
   
   so, lets avoid this. this might unintentionally expose payload classes to 
readers as well which is not the intent for v 9 and above. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -78,27 +95,41 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
     secondUpdate.write.format("hudi").
       option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
-      options(opts).
       mode(SaveMode.Append).
       save(basePath)
-    // 4. Validate.
-    val df = spark.read.format("hudi").options(opts).load(basePath)
+    // 4. Add a trivial update to trigger payload class mismatch.
+    val thirdUpdateData = Seq(
+      (12, "3", "rider-CC", "driver-CC", 33.90, "i"))
+    val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+    if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {

Review Comment:
   why do we need to test this in this test? 
   why not add a diff simple test just to test updating a diff payload class? 
we do not need to test this one for all diff payload classes. just doing it for 
1 combination should suffice



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
     return HoodieRecordPayload.getPayloadClassName(this);
   }
 
+  public String getLegacyPayloadClass() {
+    return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+  }
+
   public String getRecordMergeStrategyId() {
     return getString(RECORD_MERGE_STRATEGY_ID);
   }
 
+  /**
+   * Handle table config creation logic when creating a table for Table 
Version 9,
+   * which is based on the logic of table version < 9, and then tuned for 
version 9 logic.
+   * This approach fits the same behavior of upgrade from 8 to 9.
+   */
+  public static Map<String, String> 
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+                                                                   String 
payloadClassName,
+                                                                   String 
recordMergeStrategyId,
+                                                                   String 
orderingFieldName,
+                                                                   
HoodieTableVersion tableVersion) {
+    Map<String, String> reconciledConfigs = new HashMap<>();
+    if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+      return reconciledConfigs;
+    }
+
+    // Step 1: Infer merging configs based on input information.
+    // This step is important since it provides the same configs before we do 
table upgrade.
+    // Then additional logic for table version 9 could be verified.
+    Triple<RecordMergeMode, String, String> inferredConfigs = 
inferBasicMergingBehavior(
+        recordMergeMode, payloadClassName, recordMergeStrategyId, 
orderingFieldName, tableVersion);
+    recordMergeMode = inferredConfigs.getLeft();
+    recordMergeStrategyId = inferredConfigs.getRight();
+
+    // Step 2: Handle Version 9 specific logic.
+    // CASE 0: For tables with special merger properties, e.g., with 
non-builtin mergers.
+    // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+    //   NOTE: Payload class should NOT be set for these cases.
+    if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+        || StringUtils.isNullOrEmpty(payloadClassName)) {
+      reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+      reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
recordMergeStrategyId);
+    } else {
+      // For tables using payload classes.
+      //   CASE 2: Custom payload class. We set these properties explicitly.
+      if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+        reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+        reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+        reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+      } else { // CASE 3: Payload classes are under deprecation.
+        // Standard merging configs.
+        // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of 
PAYLOAD_CLASS_NAME here.
+        if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
EVENT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+        } else {
+          reconciledConfigs.put(RECORD_MERGE_MODE.key(), 
COMMIT_TIME_ORDERING.name());
+          reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(), 
payloadClassName);
+          reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(), 
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+        }
+        // Partial update mode config.
+        if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+            || 
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
 {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_DEFAULTS.name());
+        } else if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(), 
PartialUpdateMode.IGNORE_MARKERS.name());
+        }
+        // Additional custom properties.
+        if 
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+          reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + 
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+        } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName())) 
{
+          reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY, 
OP_FIELD);
+          reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER, 
D_VALUE);
+        }
+      }
+    }
+    return reconciledConfigs;
+  }
+
   /**
    * Infers the merging behavior based on what the user sets (or doesn't set).
-   * Validates that the user has not set an illegal combination of configs
+   * Validates that the user has not set an illegal combination of configs.
+   * This function infers basic merging properties used by table version <= 8.
    */
-  public static Triple<RecordMergeMode, String, String> 
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
-                                                                               
     String payloadClassName,
-                                                                               
     String recordMergeStrategyId,
-                                                                               
     String orderingFieldNamesAsString,
-                                                                               
     HoodieTableVersion tableVersion) {
+  public static Triple<RecordMergeMode, String, String> 
inferBasicMergingBehavior(RecordMergeMode recordMergeMode,

Review Comment:
   curious to understand, why we do not have early returns in the impl of this 
method for table version 9 and above? 
   
   May be we can special case expression payload. but otherwise, we should not 
be needing to infer any merge related props w/ tbl v 9 and above. 
   I remember explaining this ask for this entire patch before. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to