This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cb44650c9491 [HUDI-9728] Set record merge properties properly for
write and read paths (#13738)
cb44650c9491 is described below
commit cb44650c9491029d5c7eace9b938eef254b4ea99
Author: Lin Liu <[email protected]>
AuthorDate: Fri Aug 22 12:21:36 2025 -0700
[HUDI-9728] Set record merge properties properly for write and read paths
(#13738)
Co-authored-by: danny0405 <[email protected]>
---
.../java/org/apache/hudi/table/HoodieTable.java | 8 +-
.../apache/hudi/table/TestHoodieSparkTable.java | 2 +
.../hudi/common/engine/HoodieReaderContext.java | 23 +-
.../hudi/common/model/HoodieRecordPayload.java | 3 -
.../hudi/common/table/HoodieTableConfig.java | 8 +-
.../table/read/BufferedRecordMergerFactory.java | 2 +
.../org/apache/hudi/common/util/ConfigUtils.java | 14 ++
.../table/read/TestPartialUpdateHandler.java | 13 ++
.../apache/hudi/common/util/TestConfigUtils.java | 39 ++++
.../apache/hudi/utils/TestFlinkWriteClients.java | 5 +-
.../functional/TestPayloadDeprecationFlow.scala | 260 +++++++++++++++++----
.../deltastreamer/TestHoodieDeltaStreamer.java | 2 +-
12 files changed, 312 insertions(+), 67 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index fc87ac6fb5b8..5c8e2d1cd680 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -810,10 +810,10 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @throws HoodieIOException
*/
void reconcileAgainstMarkers(HoodieEngineContext context,
- String instantTs,
- List<HoodieWriteStat> stats,
- boolean consistencyCheckEnabled,
- boolean
shouldFailOnDuplicateDataFileDetection,
+ String instantTs,
+ List<HoodieWriteStat> stats,
+ boolean consistencyCheckEnabled,
+ boolean shouldFailOnDuplicateDataFileDetection,
WriteMarkers markers) throws HoodieIOException {
try {
// Reconcile marker and data files with WriteStats so that partially
written data-files due to failed
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
index 6cc1d42a2a66..e50bb9e5ecb7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieSparkTable.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -84,6 +85,7 @@ public class TestHoodieSparkTable extends
HoodieCommonTestHarness {
when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
HoodieStorage storage = mock(HoodieStorage.class);
when(metaClient.getStorage()).thenReturn(storage);
+ when(metaClient.getTableConfig()).thenReturn(new HoodieTableConfig());
additionalFiles.forEach(fileName -> {
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index fa9c0658e7c0..11ff79eacff7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
@@ -52,10 +53,13 @@ import org.apache.avro.Schema;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static
org.apache.hudi.common.table.HoodieTableConfig.inferMergingConfigsForPreV9Table;
/**
@@ -189,6 +193,10 @@ public abstract class HoodieReaderContext<T> {
return storageConfiguration;
}
+ public TypedProperties getMergeProps(TypedProperties props) {
+ return ConfigUtils.getMergeProps(props, this.tableConfig.getProps());
+ }
+
public Option<Predicate> getKeyFilterOpt() {
return keyFilterOpt;
}
@@ -276,10 +284,17 @@ public abstract class HoodieReaderContext<T> {
HoodieTableVersion tableVersion = tableConfig.getTableVersion();
// If the provided payload class differs from the table's payload class,
we need to infer the correct merging behavior.
if (isIngestion && writerPayloadClass.map(className ->
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferMergingConfigsForWrites(null, writerPayloadClass.get(),
null,
- tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
- recordMergeMode = triple.getLeft();
- mergeStrategyId = triple.getRight();
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE)) {
+ Map<String, String> mergeProperties =
HoodieTableConfig.inferMergingConfigsForV9TableCreation(
+ null, writerPayloadClass.get(), null,
tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
+ recordMergeMode =
RecordMergeMode.valueOf(mergeProperties.get(RECORD_MERGE_MODE.key()));
+ mergeStrategyId = mergeProperties.get(RECORD_MERGE_STRATEGY_ID.key());
+ } else {
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferMergingConfigsForWrites(
+ null, writerPayloadClass.get(), null,
tableConfig.getOrderingFieldsStr().orElse(null), tableVersion);
+ recordMergeMode = triple.getLeft();
+ mergeStrategyId = triple.getRight();
+ }
} else if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple =
inferMergingConfigsForPreV9Table(
recordMergeMode, tableConfig.getPayloadClass(),
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 4cadae787020..1d4db1a17c00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -36,7 +36,6 @@ import java.util.Map;
import java.util.Properties;
import static
org.apache.hudi.common.table.HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
-import static
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
import static
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
@@ -206,8 +205,6 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
String payloadClassName = null;
if (ConfigUtils.containsConfigProperty(props, PAYLOAD_CLASS_NAME)) {
payloadClassName = ConfigUtils.getStringWithAltKeys(props,
PAYLOAD_CLASS_NAME);
- } else if (props.containsKey(LEGACY_PAYLOAD_CLASS_NAME.key())) {
- payloadClassName = ConfigUtils.getStringWithAltKeys(props,
LEGACY_PAYLOAD_CLASS_NAME);
} else if (props.containsKey("hoodie.datasource.write.payload.class")) {
payloadClassName =
props.getProperty("hoodie.datasource.write.payload.class");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index a815f6d88814..2cd47294d87e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -830,10 +830,10 @@ public class HoodieTableConfig extends HoodieConfig {
* This approach fits the same behavior of upgrade from 8 to 9.
*/
public static Map<String, String>
inferMergingConfigsForV9TableCreation(RecordMergeMode recordMergeMode,
- String
payloadClassName,
- String
recordMergeStrategyId,
- String
orderingFieldName,
-
HoodieTableVersion tableVersion) {
+
String payloadClassName,
+
String recordMergeStrategyId,
+
String orderingFieldName,
+
HoodieTableVersion tableVersion) {
Map<String, String> reconciledConfigs = new HashMap<>();
if (tableVersion.lesserThan(HoodieTableVersion.NINE)) {
throw new HoodieIOException("Unsupported flow for table versions less
than 9");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index a08886a23b9a..5fd50ffcfd00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -82,6 +82,8 @@ public class BufferedRecordMergerFactory {
return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
}
+ // might need to introduce a merge config for the factory in the future to
get rid of this.
+ props = readerContext.getMergeProps(props);
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
if (partialUpdateModeOpt.isEmpty()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 38b7637184bf..4e8ce7f274a9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -52,6 +52,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
@@ -139,6 +140,19 @@ public class ConfigUtils {
return props;
}
+ /**
+ * Ensures that the prefixed merge properties are populated for mergers.
+ */
+ public static TypedProperties getMergeProps(TypedProperties props,
TypedProperties tableProps) {
+ Map<String, String> mergeProps = extractWithPrefix(tableProps,
RECORD_MERGE_PROPERTY_PREFIX);
+ if (mergeProps.isEmpty()) {
+ return props;
+ }
+ TypedProperties copied = TypedProperties.copy(props);
+ mergeProps.forEach(copied::setProperty);
+ return copied;
+ }
+
/**
* Get payload class.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
index 008c70916fae..1b1bc3b369a1 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestPartialUpdateHandler.java
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
+import static
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,6 +41,15 @@ class TestPartialUpdateHandler {
assertTrue(result.isEmpty());
}
+ @Test
+ void testNonEmptyProperties() {
+ TypedProperties props = new TypedProperties();
+ props.put(RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE,
DEBEZIUM_UNAVAILABLE_VALUE);
+ Map<String, String> result =
PartialUpdateHandler.parseMergeProperties(props);
+ assertTrue(result.containsKey(PARTIAL_UPDATE_UNAVAILABLE_VALUE));
+ assertEquals(DEBEZIUM_UNAVAILABLE_VALUE,
result.get(PARTIAL_UPDATE_UNAVAILABLE_VALUE));
+ }
+
@Test
void testDirectMatch() {
Schema stringSchema = Schema.create(Schema.Type.STRING);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 9075228c7802..4781aff2f8d7 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -383,4 +383,43 @@ public class TestConfigUtils {
Map<String, String> result = ConfigUtils.extractWithPrefix(null,
RECORD_MERGE_PROPERTY_PREFIX);
assertTrue(result.isEmpty());
}
+
+ @Test
+ void testParseRecordMergePropertiesWithPrefixedProperties() {
+ TypedProperties tableProps = new TypedProperties();
+ tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "strategy", "overwrite");
+ tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "field", "col1");
+
+ TypedProperties props = ConfigUtils.getMergeProps(new TypedProperties(),
tableProps);
+ props.put("unrelated.key", "value");
+
+ assertEquals("overwrite", props.get("strategy"));
+ assertEquals("col1", props.get("field"));
+ assertEquals("value", props.get("unrelated.key"));
+ }
+
+ @Test
+ void testParseRecordMergePropertiesWithNoPrefixedProperties() {
+ TypedProperties tableProps = new TypedProperties();
+
+ TypedProperties props = new TypedProperties();
+ props.put("normal.key", "val");
+ props = ConfigUtils.getMergeProps(props, tableProps);
+
+ assertEquals(1, props.size());
+ assertEquals("val", props.get("normal.key"));
+ }
+
+ @Test
+ void testParseRecordMergePropertiesWithOverwrite() {
+ TypedProperties tableProps = new TypedProperties();
+ tableProps.put(RECORD_MERGE_PROPERTY_PREFIX + "strategy", "overwrite");
+
+ TypedProperties props = new TypedProperties();
+ props.put("strategy", "keep");
+ props = ConfigUtils.getMergeProps(props, tableProps);
+
+ assertEquals(1, props.size());
+ assertEquals("overwrite", props.get("strategy"));
+ }
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 042b217710ef..27bbf96a38df 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
@@ -109,7 +110,7 @@ public class TestFlinkWriteClients {
assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.EVENT_TIME_ORDERING));
assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
- assertThat(tableConfig.getPayloadClass(),
is(EventTimeAvroPayload.class.getName()));
+ assertThat(tableConfig.getPayloadClass(),
is(DefaultHoodieRecordPayload.class.getName()));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
String mergerClasses =
writeConfig.getString(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES);
@@ -154,7 +155,7 @@ public class TestFlinkWriteClients {
assertThat(tableConfig.getRecordMergeMode(),
is(RecordMergeMode.EVENT_TIME_ORDERING));
assertThat(tableConfig.getRecordMergeStrategyId(),
is(HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID));
- assertThat(tableConfig.getPayloadClass(),
is(PartialUpdateAvroPayload.class.getName()));
+ assertThat(tableConfig.getPayloadClass(),
is(DefaultHoodieRecordPayload.class.getName()));
HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
String mergerClasses =
writeConfig.getString(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index b3de7692c65e..36961ead2d66 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,8 +21,8 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, ORDERING_FIELDS,
RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.{AWSDmsAvroPayload,
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger,
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload,
PartialUpdateAvroPayload}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload,
DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger,
HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload,
OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
import org.apache.hudi.common.model.debezium.{DebeziumConstants,
MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
@@ -43,12 +43,14 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
*/
@ParameterizedTest
@MethodSource(Array("providePayloadClassTestCases"))
- def testMergerBuiltinPayload(tableType: String,
- payloadClazz: String,
- expectedConfigs: Map[String, String]): Unit = {
+ def testMergerBuiltinPayloadUpgradePath(tableType: String,
+ payloadClazz: String,
+ expectedConfigs: Map[String,
String]): Unit = {
val opts: Map[String, String] = Map(
- HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz)
- val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+ HoodieMetadataConfig.ENABLE.key() -> "false")
+ val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq",
+ DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
// 1. Add an insert.
val data = Seq(
(10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
@@ -83,7 +85,8 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
// 2. Add an update.
val firstUpdateData = Seq(
- (11, 1L, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+ (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+ (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
(11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
firstUpdate.write.format("hudi").
@@ -133,7 +136,19 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
}
}
- // 5. Validate.
+ // 5. Add a delete.
+ val fourthUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+ (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ val fourthUpdate = spark.createDataFrame(fourthUpdateData).toDF(columns:
_*)
+ fourthUpdate.write.format("hudi").
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
+ mode(SaveMode.Append).
+ save(basePath)
+
+ // 6. Validate.
// Validate table configs.
tableConfig = metaClient.getTableConfig
expectedConfigs.foreach { case (key, expectedValue) =>
@@ -168,6 +183,162 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
&& timeTravelDf.except(expectedTimeTravelDf).isEmpty)
}
+ @ParameterizedTest
+ @MethodSource(Array("providePayloadClassTestCases"))
+ def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
+ payloadClazz: String,
+ expectedConfigs:
Map[String, String]): Unit = {
+ val opts: Map[String, String] = Map(
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+ HoodieMetadataConfig.ENABLE.key() -> "false")
+ val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq",
+ DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
+ // 1. Add an insert.
+ val data = Seq(
+ (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
+ (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
+ (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
+ (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
+ (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ val orderingFields = if
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ "_event_bin_file,_event_pos"
+ } else {
+ "ts"
+ }
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "_event_lsn").
+ option(ORDERING_FIELDS.key(), orderingFields).
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+ // Verify table was created successfully
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ var tableConfig = metaClient.getTableConfig
+ // Verify table version is 9
+ assertEquals(9, tableConfig.getTableVersion.versionCode())
+ assertTrue(metaClient.getActiveTimeline.firstInstant().isPresent)
+ // Verify table properties
+ expectedConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, tableConfig.getString(key), s"Config $key
should be $expectedValue")
+ } else {
+ assertFalse(tableConfig.contains(key), s"Config $key should not be
present")
+ }
+ }
+
+ // 2. Add an update.
+ val firstUpdateData = Seq(
+ (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1),
+ (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
+ (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1))
+ val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+ firstUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ mode(SaveMode.Append).
+ save(basePath)
+ // Validate table version.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+ val firstUpdateInstantTime =
metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+
+
+ // 3. Add an update. This is expected to trigger the upgrade
+ val compactionEnabled = if
(tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ "true"
+ } else {
+ "false"
+ }
+ val secondUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+ (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
+ (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
+ secondUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
+ option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
+ mode(SaveMode.Append).
+ save(basePath)
+ // Validate table version as 9.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
+ assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
+ val compactionInstants =
metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
+ val foundCompaction = compactionInstants.stream().anyMatch(i =>
i.getAction.equals("commit"))
+ assertTrue(foundCompaction)
+
+ // 4. Add a trivial update to trigger payload class mismatch.
+ val thirdUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1))
+ val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ if (!payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ assertThrows[HoodieException] {
+ thirdUpdate.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+ option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
+ classOf[MySqlDebeziumAvroPayload].getName).
+ mode(SaveMode.Append).
+ save(basePath)
+ }
+ }
+
+ // 5. Add a delete.
+ val fourthUpdateData = Seq(
+ (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
+ (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ val fourthUpdate = spark.createDataFrame(fourthUpdateData).toDF(columns:
_*)
+ fourthUpdate.write.format("hudi").
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
+ mode(SaveMode.Append).
+ save(basePath)
+
+ // 6. Validate.
+ // Validate table configs again.
+ tableConfig = metaClient.getTableConfig
+ expectedConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, tableConfig.getString(key), s"Config $key
should be $expectedValue")
+ } else {
+ assertFalse(tableConfig.contains(key), s"Config $key should not be
present")
+ }
+ }
+ // Validate snapshot query.
+ val df = spark.read.format("hudi").load(basePath)
+ val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare",
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
+ .sort("_event_lsn")
+ val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+ val expectedDf =
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
_*).sort("_event_lsn")
+ expectedDf.show(false)
+ finalDf.show(false)
+ assertTrue(expectedDf.except(finalDf).isEmpty &&
finalDf.except(expectedDf).isEmpty)
+ // Validate time travel query.
+ val timeTravelDf = spark.read.format("hudi")
+ .option("as.of.instant", firstUpdateInstantTime).load(basePath)
+ .select("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME)
+ .sort("_event_lsn")
+ timeTravelDf.show(false)
+ val expectedTimeTravelData =
getExpectedResultForTimeTravelQuery(payloadClazz)
+ val expectedTimeTravelDf = spark.createDataFrame(
+ spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns:
_*).sort("_event_lsn")
+ expectedTimeTravelDf.show(false)
+ timeTravelDf.show(false)
+ assertTrue(
+ expectedTimeTravelDf.except(timeTravelDf).isEmpty
+ && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+ }
+
def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
val props = TypedProperties.fromMap(hudiOpts.asJava)
HoodieWriteConfig.newBuilder()
@@ -184,25 +355,19 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
|| payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)
|| payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
Seq(
- (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+ (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
- (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
- (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
- (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1))
} else {
Seq(
- (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+ (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
- (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
- (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
- (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1))
}
} else {
Seq(
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
- (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
- (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1),
- (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 9, 1))
}
}
@@ -210,7 +375,7 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
Seq[(Int, Long, String, String, Double, String, String, Int, Int)] = {
if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
Seq(
- (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1", 11, 1),
+ (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1),
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1),
(10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
(10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
@@ -234,34 +399,27 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ ),
Arguments.of(
"COPY_ON_WRITE",
classOf[OverwriteWithLatestAvroPayload].getName,
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
- )
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
),
- Arguments.of(
- "COPY_ON_WRITE",
- classOf[PartialUpdateAvroPayload].getName,
- Map(
- HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
Arguments.of(
"COPY_ON_WRITE",
classOf[PostgresDebeziumAvroPayload].getName,
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
- -> "__debezium_unavailable_value"),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
+ -> "__debezium_unavailable_value")
+ ),
Arguments.of(
"COPY_ON_WRITE",
classOf[MySqlDebeziumAvroPayload].getName,
@@ -276,9 +434,10 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[AWSDmsAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+ ),
Arguments.of(
"COPY_ON_WRITE",
classOf[EventTimeAvroPayload].getName,
@@ -304,15 +463,15 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ ),
Arguments.of(
"MERGE_ON_READ",
classOf[OverwriteWithLatestAvroPayload].getName,
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
- )
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
),
Arguments.of(
"MERGE_ON_READ",
@@ -320,18 +479,20 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+ ),
Arguments.of(
"MERGE_ON_READ",
classOf[PostgresDebeziumAvroPayload].getName,
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[PostgresDebeziumAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID),
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_MARKERS",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
- -> "__debezium_unavailable_value"),
+ -> "__debezium_unavailable_value")
+ ),
Arguments.of(
"MERGE_ON_READ",
classOf[MySqlDebeziumAvroPayload].getName,
@@ -346,9 +507,10 @@ object TestPayloadDeprecationFlow {
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() ->
classOf[AWSDmsAvroPayload].getName,
- HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID),
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() ->
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID,
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX +
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+ ),
Arguments.of(
"MERGE_ON_READ",
classOf[EventTimeAvroPayload].getName,
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7948e0f8610f..4cbe5de74d73 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1883,7 +1883,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
//now assert that hoodie.properties file now has updated payload class name
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc,
dataSetBasePath, false);
- assertEquals(metaClient.getTableConfig().getPayloadClass(),
PartialUpdateAvroPayload.class.getName());
+ assertEquals(metaClient.getTableConfig().getPayloadClass(),
DefaultHoodieRecordPayload.class.getName());
}
@Disabled("To be fixed with HUDI-9714")