nsivabalan commented on code in PR #13615:
URL: https://github.com/apache/hudi/pull/13615#discussion_r2274228608
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -121,10 +132,35 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String PARTIAL_UPDATE_CUSTOM_MARKER =
"hoodie.write.partial.update.custom.marker";
public static final String DEBEZIUM_UNAVAILABLE_VALUE =
"__debezium_unavailable_value";
// This prefix is used to set merging related properties.
- // A reader might need to read some merger properties to function as
expected,
+ // A reader might need to read some writer properties to function as
expected,
// and Hudi stores properties with this prefix so the reader parses these
properties,
// and produces a map of key value pairs (Key1->Value1, Key2->Value2, ...)
to use.
- public static final String MERGE_PROPERTIES_PREFIX =
"hoodie.table.merge.properties.";
+ public static final String MERGE_CUSTOM_PROPERTY_PREFIX =
"hoodie.merge.custom.property.prefix.";
Review Comment:
why changing the key name here. we don't need "prefix" as part of the key.
`hoodie.merge.custom.property` would suffice.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ public String getLegacyPayloadClass() {
+ return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+ }
+
public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * which is based on the logic of table version < 9, and then tuned for
version 9 logic.
+ * This approach fits the same behavior of upgrade from 8 to 9.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
Review Comment:
for future:
we might need to call this for table version 9 and above and not just 9.
also, we should only be calling if payload Class is not null.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ public String getLegacyPayloadClass() {
+ return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+ }
+
public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * which is based on the logic of table version < 9, and then tuned for
version 9 logic.
+ * This approach fits the same behavior of upgrade from 8 to 9.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+ String
orderingFieldName,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> reconciledConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return reconciledConfigs;
+ }
+
+ // Step 1: Infer merging configs based on input information.
+ // This step is important since it provides the same configs before we do
table upgrade.
+ // Then additional logic for table version 9 could be verified.
+ Triple<RecordMergeMode, String, String> inferredConfigs =
inferBasicMergingBehavior(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+ recordMergeMode = inferredConfigs.getLeft();
+ recordMergeStrategyId = inferredConfigs.getRight();
+
+ // Step 2: Handle Version 9 specific logic.
+ // CASE 0: For tables with special merger properties, e.g., with
non-builtin mergers.
+ // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+ // NOTE: Payload class should NOT be set for these cases.
+ if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+ || StringUtils.isNullOrEmpty(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
recordMergeStrategyId);
+ } else {
+ // For tables using payload classes.
+ // CASE 2: Custom payload class. We set these properties explicitly.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // CASE 3: Payload classes are under deprecation.
+ // Standard merging configs.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode config.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
Review Comment:
Note to self: I am not sure if we will go w/ entire
https://github.com/apache/hudi/pull/13623.
so, atleast we need to pull in changes not related to unification like
changing this ENUM naming.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1125,10 +1125,12 @@ class HoodieSparkSqlWriterInternal {
HoodieTableVersion.fromVersionCode(
SparkConfigUtils.getStringWithAltKeys(mergedParams,
WRITE_TABLE_VERSION).toInt)
}
+ // Handle merge properties.
if (!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_MODE.key())
||
!mergedParams.contains(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key())
||
!mergedParams.contains(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key())) {
- val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+ // TODO: We should remove this inference since it anyways will be done
when creating metaClient.
Review Comment:
@linliu-code : why not clean it up right away ?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java:
##########
@@ -152,10 +153,13 @@ public static <R> HoodieRecord<R>
convertToHoodieRecordPayload(GenericRecord rec
record = recordWithoutMetaFields;
}
- HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new
HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
- HoodieRecordUtils.loadPayload(payloadClazz, record, preCombineVal),
operation);
-
- return (HoodieRecord<R>) hoodieRecord;
+ if (StringUtils.isNullOrEmpty(payloadClazz)) {
+ return (HoodieRecord<R>) new HoodieAvroIndexedRecord(new
HoodieKey(recKey, partitionPath), record);
Review Comment:
yes, we need to fix this once COW merge handle migration is complete and all
payload creation has been fixed.
CC @the-other-tim-brown
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -216,6 +217,34 @@ object HoodieWriterUtils {
validateTableConfig(spark, params, tableConfig, false)
}
+ /**
+ * This function adds specific rules to choose config key in table config
for a given writer key.
+ *
+ * RULE 1: When
+ * 1. table version is 9,
+ * 2. writer key is a payload class key, and
+ * 3. table config has legacy payload class configured,
+ * then
+ * return legacy payload class key.
+ *
+ * Basic rule:
+ * return writer key.
+ */
+ def getKeyInTableConfig(key: String, tableConfig: HoodieConfig): String = {
Review Comment:
yes, also, lets call this only incase of payload class config key and not
for other cases.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ public String getLegacyPayloadClass() {
+ return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+ }
+
public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * which is based on the logic of table version < 9, and then tuned for
version 9 logic.
+ * This approach fits the same behavior of upgrade from 8 to 9.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+ String
orderingFieldName,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> reconciledConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return reconciledConfigs;
+ }
+
+ // Step 1: Infer merging configs based on input information.
+ // This step is important since it provides the same configs before we do
table upgrade.
+ // Then additional logic for table version 9 could be verified.
+ Triple<RecordMergeMode, String, String> inferredConfigs =
inferBasicMergingBehavior(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+ recordMergeMode = inferredConfigs.getLeft();
+ recordMergeStrategyId = inferredConfigs.getRight();
+
+ // Step 2: Handle Version 9 specific logic.
+ // CASE 0: For tables with special merger properties, e.g., with
non-builtin mergers.
+ // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+ // NOTE: Payload class should NOT be set for these cases.
+ if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+ || StringUtils.isNullOrEmpty(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
Review Comment:
why are we trying to reconcile for cases which should not be touched.
for eg: in case of custom mergers, why even override any table props. we
should just route the properties set by the user as is.
Or are there chances we do some inference logic and set these props even for
custom mergers?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -260,12 +260,12 @@ private void initRecordMerger(TypedProperties properties,
boolean isIngestion) {
HoodieTableVersion tableVersion = tableConfig.getTableVersion();
// If the provided payload class differs from the table's payload class,
we need to infer the correct merging behavior.
if (isIngestion && providedPayloadClass.map(className ->
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(),
null,
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferBasicMergingBehavior(null, providedPayloadClass.get(),
null,
tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
} else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferBasicMergingBehavior(
Review Comment:
lets fix condition in L 267 to `< 8` instead of `!>=8`
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -554,7 +554,7 @@ protected List<HoodieRecord<RawTripTestPayload>>
dedupForCopyOnWriteStorage(Hood
.getReaderContextFactoryForWrite(metaClient,
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
List<String> orderingFieldNames = getOrderingFieldNames(
readerContext.getMergeMode(), writeClient.getConfig().getProps(),
metaClient);
- RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
writeConfig.getPayloadClass(), null,
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferBasicMergingBehavior(null,
writeConfig.getPayloadClass(), null,
Review Comment:
not sure if the naming change is apt. can we leave the naming as is.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -260,12 +260,12 @@ private void initRecordMerger(TypedProperties properties,
boolean isIngestion) {
HoodieTableVersion tableVersion = tableConfig.getTableVersion();
// If the provided payload class differs from the table's payload class,
we need to infer the correct merging behavior.
if (isIngestion && providedPayloadClass.map(className ->
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(),
null,
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferBasicMergingBehavior(null, providedPayloadClass.get(),
null,
Review Comment:
we should avoid calling `inferBasicMergingBehavior` for table version 9
unless we have special circumstances like this.
So, if we plan to fix impl of `inferBasicMergingBehavior` to do early return
for table version 9 or above, atleast we need to account for expresison payload
use-cases.
something to be mindful of.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1858,16 +1858,18 @@ public void testPayloadClassUpdate() throws Exception {
true, false, null, "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
assertRecordCount(1000, dataSetBasePath, sqlContext);
+ HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
//now create one more deltaStreamer instance and update payload class
cfg = TestHelpers.makeConfig(dataSetBasePath,
WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false,
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
- //now assert that hoodie.properties file now has updated payload class name
- HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
DummyAvroPayload.class.getName());
+ // NOTE: Payload class cannot be updated, though the write can be executed
using different payload classes in the runtime.
+ metaClient = HoodieTableMetaClient.reload(metaClient);
Review Comment:
I don't think this is a valid use-case.
Infact, I am surprised how we have this loose ends w/ deltastreamer. we
should not let users update payload class once table is created is the expected
behavior.
we should chase the gap and fix it for HoodieStreamer. and then fix this
test to asset for exception when payload class changes.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -186,14 +187,18 @@ private static String[]
getMandatoryFieldsForMerging(HoodieTableConfig cfg,
boolean
hasBuiltInDelete,
Option<Pair<String,
String>> customDeleteMarkerKeyAndValue,
boolean
hasInstantRange) {
- Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
- cfg.getRecordMergeMode(),
- cfg.getPayloadClass(),
- cfg.getRecordMergeStrategyId(),
- cfg.getPreCombineFieldsStr().orElse(null),
- cfg.getTableVersion());
-
- if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
+ RecordMergeMode mergeMode = cfg.getRecordMergeMode();
+ if (cfg.getTableVersion().lesserThan(HoodieTableVersion.NINE)) {
Review Comment:
if expression payload is involved, we might need to call into this even for
tbl v9
@the-other-tim-brown @lokeshj1703 : can you confirm.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java:
##########
@@ -182,16 +184,32 @@ static String getPayloadClassName(HoodieConfig config) {
}
static String getPayloadClassName(Properties props) {
- return
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+ Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
+ if (payloadOpt.isPresent()) {
+ return payloadOpt.get();
+ }
+ // Note: starting from version 9, payload class is not necessary set, but
+ // merge mode must exist. Therefore, we use merge mode to infer
+ // the payload class for certain corner cases, like for MIT command.
+ if (props.containsKey(RECORD_MERGE_MODE.key())
+ &&
props.getProperty(RECORD_MERGE_MODE.key()).equals(RecordMergeMode.COMMIT_TIME_ORDERING.name()))
{
+ return OverwriteWithLatestAvroPayload.class.getName();
+ }
+ return DEFAULT_PAYLOAD_CLASS_NAME;
}
+ // NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
+ // some temporary payload class setting is respect.
static Option<String> getPayloadClassNameIfPresent(Properties props) {
String payloadClassName = null;
if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
+ } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {
Review Comment:
we should be wary of returning even legacy payload when someone calls
`HoodieRecordPayload.getPayloadClassName(Properties props)`
legacy payload should only be used for downgrade code paths. During which,
we can get entire list of table properties and look it up w/ explicit key. we
don't need a direct getter from HoodieTableConfig.
so, lets avoid this. this might unintentionally expose payload classes to
readers as well which is not the intent for v 9 and above.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -78,27 +95,41 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
secondUpdate.write.format("hudi").
option(OPERATION.key(), "upsert").
- option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
- options(opts).
mode(SaveMode.Append).
save(basePath)
- // 4. Validate.
- val df = spark.read.format("hudi").options(opts).load(basePath)
+ // 4. Add a trivial update to trigger payload class mismatch.
+ val thirdUpdateData = Seq(
+ (12, "3", "rider-CC", "driver-CC", 33.90, "i"))
+ val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
Review Comment:
why do we need to test this in this test?
why not add a diff simple test just to test updating a diff payload class?
we do not need to test this one for all diff payload classes. just doing it for
1 combination should suffice
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -801,19 +843,93 @@ public String getPayloadClass() {
return HoodieRecordPayload.getPayloadClassName(this);
}
+ public String getLegacyPayloadClass() {
+ return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, "");
+ }
+
public String getRecordMergeStrategyId() {
return getString(RECORD_MERGE_STRATEGY_ID);
}
+ /**
+ * Handle table config creation logic when creating a table for Table
Version 9,
+ * which is based on the logic of table version < 9, and then tuned for
version 9 logic.
+ * This approach fits the same behavior of upgrade from 8 to 9.
+ */
+ public static Map<String, String>
inferMergingConfigsForVersion9(RecordMergeMode recordMergeMode,
+ String
payloadClassName,
+ String
recordMergeStrategyId,
+ String
orderingFieldName,
+
HoodieTableVersion tableVersion) {
+ Map<String, String> reconciledConfigs = new HashMap<>();
+ if (tableVersion.versionCode() != HoodieTableVersion.NINE.versionCode()) {
+ return reconciledConfigs;
+ }
+
+ // Step 1: Infer merging configs based on input information.
+ // This step is important since it provides the same configs before we do
table upgrade.
+ // Then additional logic for table version 9 could be verified.
+ Triple<RecordMergeMode, String, String> inferredConfigs =
inferBasicMergingBehavior(
+ recordMergeMode, payloadClassName, recordMergeStrategyId,
orderingFieldName, tableVersion);
+ recordMergeMode = inferredConfigs.getLeft();
+ recordMergeStrategyId = inferredConfigs.getRight();
+
+ // Step 2: Handle Version 9 specific logic.
+ // CASE 0: For tables with special merger properties, e.g., with
non-builtin mergers.
+ // CASE 1: For tables using MERGE MODE, or CUSTOM builtin mergers.
+ // NOTE: Payload class should NOT be set for these cases.
+ if (!BUILTIN_MERGE_STRATEGIES.contains(recordMergeStrategyId)
+ || StringUtils.isNullOrEmpty(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
recordMergeStrategyId);
+ } else {
+ // For tables using payload classes.
+ // CASE 2: Custom payload class. We set these properties explicitly.
+ if (!PAYLOADS_UNDER_DEPRECATION.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(), CUSTOM.toString());
+ reconciledConfigs.put(PAYLOAD_CLASS_NAME.key(), payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ } else { // CASE 3: Payload classes are under deprecation.
+ // Standard merging configs.
+ // NOTE: We use LEGACY_PAYLOAD_CLASS_NAME instead of
PAYLOAD_CLASS_NAME here.
+ if (EVENT_TIME_BASED_PAYLOADS.contains(payloadClassName)) {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
EVENT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
+ } else {
+ reconciledConfigs.put(RECORD_MERGE_MODE.key(),
COMMIT_TIME_ORDERING.name());
+ reconciledConfigs.put(LEGACY_PAYLOAD_CLASS_NAME.key(),
payloadClassName);
+ reconciledConfigs.put(RECORD_MERGE_STRATEGY_ID.key(),
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID);
+ }
+ // Partial update mode config.
+ if (payloadClassName.equals(PartialUpdateAvroPayload.class.getName())
+ ||
payloadClassName.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()))
{
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_DEFAULTS.name());
+ } else if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(PARTIAL_UPDATE_MODE.key(),
PartialUpdateMode.IGNORE_MARKERS.name());
+ }
+ // Additional custom properties.
+ if
(payloadClassName.equals(PostgresDebeziumAvroPayload.class.getName())) {
+ reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX +
PARTIAL_UPDATE_CUSTOM_MARKER, DEBEZIUM_UNAVAILABLE_VALUE);
+ } else if (payloadClassName.equals(AWSDmsAvroPayload.class.getName()))
{
+ reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_KEY,
OP_FIELD);
+ reconciledConfigs.put(MERGE_CUSTOM_PROPERTY_PREFIX + DELETE_MARKER,
D_VALUE);
+ }
+ }
+ }
+ return reconciledConfigs;
+ }
+
/**
* Infers the merging behavior based on what the user sets (or doesn't set).
- * Validates that the user has not set an illegal combination of configs
+ * Validates that the user has not set an illegal combination of configs.
+ * This function infers basic merging properties used by table version <= 8.
*/
- public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavior(RecordMergeMode recordMergeMode,
-
String payloadClassName,
-
String recordMergeStrategyId,
-
String orderingFieldNamesAsString,
-
HoodieTableVersion tableVersion) {
+ public static Triple<RecordMergeMode, String, String>
inferBasicMergingBehavior(RecordMergeMode recordMergeMode,
Review Comment:
curious to understand, why we do not have early returns in the impl of this
method for table version 9 and above?
May be we can special case expression payload. but otherwise, we should not
be needing to infer any merge related props w/ tbl v 9 and above.
I remember explaining this ask for this entire patch before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]