This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 27a950ee223 [HUDI-8850] Fix record merge mode related issues and
improve test coverage in Spark SQL (#12725)
27a950ee223 is described below
commit 27a950ee22320d7b447fd3e31693ae9290dd5842
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jan 30 02:07:55 2025 -0800
[HUDI-8850] Fix record merge mode related issues and improve test coverage
in Spark SQL (#12725)
This PR fixes record merge mode issues and improves test coverage in Spark
SQL:
- Table version 6 lacks record merge mode in the config. The file group
reader now infers it if missing, enabling v6 table reads.
- For INSERT INTO in Spark SQL, buildHoodieInsertConfig didn't set record
merge mode properly. The logic is now removed.
- The SQL writer now gets the correct record merge mode from the table
config automatically.
- In MERGE INTO in Spark SQL, the record merge mode check is fixed to avoid
NPE.
- More tests are added in TestMergeModeCommitTimeOrdering and
TestMergeModeEventTimeOrdering for different merge modes. The tests cover both
table version 6 and 8.
---
.../common/table/read/HoodieFileGroupReader.java | 16 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 18 +-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 13 +-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 33 +-
.../spark/sql/hudi/dml/TestMergeIntoTable2.scala | 3 +-
.../hudi/dml/TestMergeModeCommitTimeOrdering.scala | 349 +++++++++++++--------
.../hudi/dml/TestMergeModeEventTimeOrdering.scala | 263 ++++++++++------
7 files changed, 448 insertions(+), 247 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index bf9653efbf3..e77bded8a77 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
@@ -102,9 +104,17 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.start = start;
this.length = length;
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+ RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
+ String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
+ if
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
+ recordMergeMode, tableConfig.getPayloadClass(),
+ mergeStrategyId, null);
+ recordMergeMode = triple.getLeft();
+ mergeStrategyId = triple.getRight();
+ }
readerContext.setRecordMerger(readerContext.getRecordMerger(
- tableConfig.getRecordMergeMode(),
- tableConfig.getRecordMergeStrategyId(),
+ recordMergeMode, mergeStrategyId,
props.getString(RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY,
props.getString(RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY, ""))));
readerContext.setTablePath(tablePath);
@@ -122,7 +132,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
- tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption,
this.logFiles.isEmpty(),
+ recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
isSkipMerge, shouldUseRecordPosition, readStats);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7de0617b23b..d7de38e9de9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions,
HoodieFileIndex}
import
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, RecordMergeMode, TypedProperties}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, WriteOperationType}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
HoodieCommonConfig, TypedProperties}
+import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
@@ -257,21 +257,7 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}
- val deducedPayloadClassName =
classOf[DefaultHoodieRecordPayload].getCanonicalName
- val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
- val recordMergeStrategy =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
-
- if
(tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName)
&&
-
RecordMergeMode.EVENT_TIME_ORDERING.equals(tableConfig.getRecordMergeMode)) {
- tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
- tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
- tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID)
- }
-
val defaultOpts = Map(
- DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> deducedPayloadClassName,
- DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode,
- DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID.key() ->
recordMergeStrategy,
// NOTE: By default insert would try to do deduplication in case that
pre-combine column is specified
// for the table
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key ->
String.valueOf(combineBeforeInsert),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f90fbef8e82..d6545911c1b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -28,14 +28,15 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
RECORD_MERGE_MODE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME,
WRITE_PARTIAL_UPDATE_SCHEMA}
+import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME, WRITE_PARTIAL_UPDATE_SCHEMA}
import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.JFunction.scalaFunction1Noop
+
import org.apache.avro.Schema
import org.apache.spark.sql._
-import org.apache.spark.sql.HoodieCatalystExpressionUtils.{MatchCast,
attributeEquals}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.{attributeEquals,
MatchCast}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
@@ -46,13 +47,14 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions,
getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
-import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{CoercedAttributeReference,
encodeAsBase64String, stripCasting, toStructType, userGuideString,
validateTargetTableAttrExistsInAssignments}
+import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.{encodeAsBase64String,
stripCasting, toStructType, userGuideString,
validateTargetTableAttrExistsInAssignments, CoercedAttributeReference}
import
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
import java.util.Base64
+
import scala.collection.JavaConverters._
/**
@@ -817,8 +819,9 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// Precombine field and record key field must be present in the assignment
clause of all insert actions for event time ordering mode.
// Check has no effect if we don't have such fields in target table or we
don't have insert actions
// Please note we are relying on merge mode in the table config as writer
merge mode is always "CUSTOM" for MIT.
- if (getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String,
Object]], HoodieTableConfig.RECORD_MERGE_MODE)
- .equals(RecordMergeMode.EVENT_TIME_ORDERING.name())) {
+ if (RecordMergeMode.EVENT_TIME_ORDERING.name()
+
.equals(getStringWithAltKeys(props.asJava.asInstanceOf[java.util.Map[String,
Object]],
+ HoodieTableConfig.RECORD_MERGE_MODE))) {
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
field => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 16187ba3fd9..0df0f89922b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -23,10 +23,12 @@ import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.storage.HoodieStorage
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient,
getSparkConfForTest}
import org.apache.hadoop.fs.Path
@@ -37,6 +39,7 @@ import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageConta
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.slf4j.LoggerFactory
@@ -323,14 +326,22 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
}
- protected def withSparkSqlSessionConfig(configNameValues: (String,
String)*)(f: => Unit): Unit = {
+ protected def withSparkSqlSessionConfig(configNameValues: (String, String)*
+ )(f: => Unit): Unit = {
+ withSparkSqlSessionConfigWithCondition(configNameValues.map(e => (e,
true)): _*)(f)
+ }
+
+ protected def withSparkSqlSessionConfigWithCondition(configNameValues:
((String, String), Boolean)*
+ )(f: => Unit): Unit = {
try {
- configNameValues.foreach { case (configName, configValue) =>
- spark.sql(s"set $configName=$configValue")
+ configNameValues.foreach { case ((configName, configValue), condition) =>
+ if (condition) {
+ spark.sql(s"set $configName=$configValue")
+ }
}
f
} finally {
- configNameValues.foreach { case (configName, _) =>
+ configNameValues.foreach { case ((configName, configValue), condition) =>
spark.sql(s"reset $configName")
}
}
@@ -375,7 +386,19 @@ object HoodieSparkSqlTestBase {
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}
+ def validateTableConfig(storage: HoodieStorage,
+ basePath: String,
+ expectedConfigs: Map[String, String],
+ nonExistentConfigs: Seq[String]): Unit = {
+ val tableConfig = HoodieTableConfig.loadFromHoodieProps(storage, basePath)
+ expectedConfigs.foreach(e => {
+ assertEquals(e._2, tableConfig.getString(e._1),
+ s"Table config ${e._1} should be ${e._2} but is
${tableConfig.getString(e._1)}")
+ })
+ nonExistentConfigs.foreach(e => assertFalse(
+ tableConfig.contains(e), s"$e should not be present in the table
config"))
+ }
+
private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)
-
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index 1e504fecd0c..bd8f7676e02 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -24,7 +24,6 @@ import
org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.Row
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-
import org.slf4j.LoggerFactory
class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
@@ -1254,7 +1253,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| ts BIGINT
|) using hudi
|TBLPROPERTIES (
- | type = 'cow',
+ | type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| recordMergeMode = '$recordMergeMode'
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index a0af78797e6..67e7d992b19 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -17,15 +17,76 @@
package org.apache.spark.sql.hudi.dml
+import org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING
+import
org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
+
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig
class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
- Seq("mor").foreach { tableType =>
- // [HUDI-8850] For COW commit time ordering does not work.
- // Seq("cow", "mor").foreach { tableType =>
- test(s"Test $tableType table with COMMIT_TIME_ORDERING merge mode") {
-
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" ->
"0"
+ // TODO(HUDI-8938): add "mor,true,true,6" after the fix
+ Seq(
+ "cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
+ "cow,6,true,false", "cow,6,true,true",
+ "mor,8,false,false", "mor,8,false,true", "mor,8,true,false",
+ "mor,6,true,false").foreach { args =>
+ val argList = args.split(',')
+ val tableType = argList(0)
+ val tableVersion = argList(1)
+ val setRecordMergeConfigs = argList(2).toBoolean
+ val setUpsertOperation = argList(3).toBoolean
+ val isUpsert = setUpsertOperation || (tableVersion.toInt != 6 &&
setRecordMergeConfigs)
+ val storage = HoodieTestUtils.getDefaultStorage
+ val mergeConfigClause = if (setRecordMergeConfigs) {
+ // with precombine field set, UPSERT operation is used automatically
+ if (tableVersion.toInt == 6) {
+ // Table version 6
+ s", payloadClass =
'${classOf[OverwriteWithLatestAvroPayload].getName}'"
+ } else {
+ // Current table version (8)
+ ", preCombineField = 'ts',\nhoodie.record.merge.mode =
'COMMIT_TIME_ORDERING'"
+ }
+ } else {
+ // By default, the COMMIT_TIME_ORDERING is used if not specified by the
user
+ ""
+ }
+ val writeTableVersionClause = if (tableVersion.toInt == 6) {
+ s"hoodie.write.table.version = $tableVersion,"
+ } else {
+ ""
+ }
+ val expectedMergeConfigs = if (tableVersion.toInt == 6) {
+ Map(
+ HoodieTableConfig.VERSION.key -> "6",
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName)
+ } else {
+ Map(
+ HoodieTableConfig.VERSION.key -> "8",
+ HoodieTableConfig.RECORD_MERGE_MODE.key -> COMMIT_TIME_ORDERING.name(),
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
COMMIT_TIME_BASED_MERGE_STRATEGY_UUID)
+ }
+ val nonExistentConfigs = if (tableVersion.toInt == 6) {
+ Seq(HoodieTableConfig.RECORD_MERGE_MODE.key,
HoodieTableConfig.PRECOMBINE_FIELD.key)
+ } else {
+ if (setRecordMergeConfigs) {
+ Seq()
+ } else {
+ Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
+ }
+ }
+
+ test(s"Test $tableType table with COMMIT_TIME_ORDERING
(tableVersion=$tableVersion,"
+ +
s"setRecordMergeConfigs=$setRecordMergeConfigs,setUpsertOperation=$setUpsertOperation)")
{
+ withSparkSqlSessionConfigWithCondition(
+ ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+ ("hoodie.spark.sql.insert.into.operation" -> "upsert",
setUpsertOperation),
+ // TODO(HUDI-8820): enable MDT after supporting MDT with table version
6
+ ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
) {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
@@ -39,14 +100,15 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
| ts long
| ) using hudi
| tblproperties (
+ | $writeTableVersionClause
| type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+ | primaryKey = 'id'
+ | $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
-
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
// Insert initial records with ts=100
spark.sql(
s"""
@@ -59,99 +121,119 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
// Verify inserting records with the same ts value are visible
(COMMIT_TIME_ORDERING)
spark.sql(
s"""
- | insert into $tableName
- | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
- | union all
- | select 2, 'B_equal', 70.0, 100
+ | insert into $tableName
+ | select 1 as id, 'A_equal' as name, 60.0 as price, 100 as ts
+ | union all
+ | select 2, 'B_equal', 70.0, 100
""".stripMargin)
-
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A_equal", 60.0, 100),
- Seq(2, "B_equal", 70.0, 100)
- )
+ (if (isUpsert) {
+ // With UPSERT operation, there is no duplicate
+ Seq(
+ Seq(1, "A_equal", 60.0, 100),
+ Seq(2, "B_equal", 70.0, 100))
+ } else {
+ // With INSERT operation, there are duplicates
+ Seq(
+ Seq(1, "A", 10.0, 100),
+ Seq(1, "A_equal", 60.0, 100),
+ Seq(2, "B", 20.0, 100),
+ Seq(2, "B_equal", 70.0, 100))
+ }): _*)
- // Verify updating records with the same ts value are visible
(COMMIT_TIME_ORDERING)
- spark.sql(
- s"""
- | update $tableName
- | set price = 50.0, ts = 100
- | where id = 1
+ if (isUpsert) {
+ // Verify updating records with the same ts value are visible
(COMMIT_TIME_ORDERING)
+ spark.sql(
+ s"""
+ | update $tableName
+ | set price = 50.0, ts = 100
+ | where id = 1
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(1, "A_equal", 50.0, 100),
+ Seq(2, "B_equal", 70.0, 100))
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A_equal", 50.0, 100),
- Seq(2, "B_equal", 70.0, 100)
- )
-
- // Verify inserting records with a lower ts value are visible
(COMMIT_TIME_ORDERING)
- spark.sql(
- s"""
- | insert into $tableName
- | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
- | union all
- | select 2, 'B', 40.0, 99
+ // Verify inserting records with a lower ts value are visible
(COMMIT_TIME_ORDERING)
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 1 as id, 'A' as name, 30.0 as price, 99 as ts
+ | union all
+ | select 2, 'B', 40.0, 99
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A", 30.0, 99),
- Seq(2, "B", 40.0, 99)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(1, "A", 30.0, 99),
+ Seq(2, "B", 40.0, 99))
- // Verify updating records with a lower ts value are visible
(COMMIT_TIME_ORDERING)
- spark.sql(
- s"""
- | update $tableName
- | set price = 50.0, ts = 98
- | where id = 1
+ // Verify updating records with a lower ts value are visible
(COMMIT_TIME_ORDERING)
+ spark.sql(
+ s"""
+ | update $tableName
+ | set price = 50.0, ts = 98
+ | where id = 1
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A", 50.0, 98),
- Seq(2, "B", 40.0, 99)
- )
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(1, "A", 50.0, 98),
+ Seq(2, "B", 40.0, 99))
- // Verify inserting records with a higher ts value are visible
(COMMIT_TIME_ORDERING)
- spark.sql(
- s"""
- | insert into $tableName
- | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
- | union all
- | select 2, 'B', 40.0, 101
+ // Verify inserting records with a higher ts value are visible
(COMMIT_TIME_ORDERING)
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 1 as id, 'A' as name, 30.0 as price, 101 as ts
+ | union all
+ | select 2, 'B', 40.0, 101
""".stripMargin)
- // Verify records with ts=101 are visible
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A", 30.0, 101),
- Seq(2, "B", 40.0, 101)
- )
+ // Verify records with ts=101 are visible
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(1, "A", 30.0, 101),
+ Seq(2, "B", 40.0, 101)
+ )
- // Verify updating records with a higher ts value are visible
(COMMIT_TIME_ORDERING)
- spark.sql(
- s"""
- | update $tableName
- | set price = 50.0, ts = 102
- | where id = 1
+ // Verify updating records with a higher ts value are visible
(COMMIT_TIME_ORDERING)
+ spark.sql(
+ s"""
+ | update $tableName
+ | set price = 50.0, ts = 102
+ | where id = 1
""".stripMargin)
- // Verify final state after all operations
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(1, "A", 50.0, 102),
- Seq(2, "B", 40.0, 101)
- )
-
- // Delete record
- spark.sql(s"delete from $tableName where id = 1")
+ // Verify final state after all operations
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(1, "A", 50.0, 102),
+ Seq(2, "B", 40.0, 101)
+ )
- // Verify deletion
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(2, "B", 40.0, 101)
- )
+ // Delete record
+ spark.sql(s"delete from $tableName where id = 1")
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ // Verify deletion
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(2, "B", 40.0, 101)
+ )
+ }
})
}
}
- test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType
table") {
-
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" ->
"0") {
+ // TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in
MERGE INTO for COW
+ test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType
table "
+ +
s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs,"
+ + s"setUpsertOperation=$setUpsertOperation)") {
+ withSparkSqlSessionConfigWithCondition(
+ ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+ ("hoodie.spark.sql.insert.into.operation" -> "upsert",
setUpsertOperation),
+ // TODO(HUDI-8820): enable MDT after supporting MDT with table version
6
+ ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
+ ) {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create table with COMMIT_TIME_ORDERING
@@ -164,82 +246,103 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
| ts long
| ) using hudi
| tblproperties (
+ | $writeTableVersionClause
| type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | hoodie.record.merge.mode = 'COMMIT_TIME_ORDERING'
+ | primaryKey = 'id'
+ | $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
- """.stripMargin)
+ """.stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
// Insert initial records
spark.sql(
s"""
| insert into $tableName
- | select 1 as id, 'A' as name, 10.0 as price, 100 as ts union
all
- | select 0, 'X', 20.0, 100 union all
- | select 2, 'B', 20.0, 100 union all
- | select 3, 'C', 30.0, 100 union all
- | select 4, 'D', 40.0, 100 union all
- | select 5, 'E', 50.0, 100 union all
- | select 6, 'F', 60.0, 100
- """.stripMargin)
+ | select 1 as id, 'A' as name, 10.0 as price, 100L as ts union
all
+ | select 0, 'X', 20.0, 100L union all
+ | select 2, 'B', 20.0, 100L union all
+ | select 3, 'C', 30.0, 100L union all
+ | select 4, 'D', 40.0, 100L union all
+ | select 5, 'E', 50.0, 100L union all
+ | select 6, 'F', 60.0, 100L
+ """.stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ // TODO(HUDI-8840): enable MERGE INTO with deletes
+ val shouldTestMergeIntoDelete = setRecordMergeConfigs &&
tableVersion.toInt == 8
// Merge operation - delete with higher, lower and equal ordering
field value, all should take effect.
- spark.sql(
- s"""
- | merge into $tableName t
- | using (
- | select 1 as id, 'B2' as name, 25.0 as price, 101 as ts
union all
- | select 2, '', 55.0, 99 as ts union all
- | select 0, '', 55.0, 100 as ts
- | ) s
- | on t.id = s.id
- | when matched then delete
- """.stripMargin)
+ if (shouldTestMergeIntoDelete) {
+ spark.sql(
+ s"""
+ | merge into $tableName t
+ | using (
+ | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts
union all
+ | select 2, '', 55.0, 99L as ts union all
+ | select 0, '', 55.0, 100L as ts
+ | ) s
+ | on t.id = s.id
+ | when matched then delete
+ """.stripMargin)
+ }
// Merge operation - update with mixed ts values
spark.sql(
s"""
| merge into $tableName t
| using (
- | select 4 as id, 'D2' as name, 45.0 as price, 101 as ts
union all
- | select 5, 'E2', 55.0, 99 as ts union all
- | select 6, 'F2', 65.0, 100 as ts
+ | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts
union all
+ | select 5, 'E2', 55.0, 99L as ts union all
+ | select 6, 'F2', 65.0, 100L as ts
| ) s
| on t.id = s.id
| when matched then update set *
- """.stripMargin)
+ """.stripMargin)
// Verify state after merges
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ val nonDeletedRows: Seq[Seq[Any]] = if (shouldTestMergeIntoDelete) {
+ Seq()
+ } else {
+ Seq(Seq(0, "X", 20.0, 100),
+ Seq(1, "A", 10.0, 100),
+ Seq(2, "B", 20.0, 100))
+ }
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(3, "C", 30.0, 100),
- Seq(4, "D2", 45.0, 101),
- Seq(5, "E2", 55.0, 99),
- Seq(6, "F2", 65.0, 100)
- )
+ (nonDeletedRows ++ Seq(
+ Seq(3, "C", 30.0, 100),
+ Seq(4, "D2", 45.0, 101),
+ Seq(5, "E2", 55.0, 99),
+ Seq(6, "F2", 65.0, 100)
+ )): _*)
// Insert new records through merge
spark.sql(
s"""
| merge into $tableName t
| using (
- | select 7 as id, 'D2' as name, 45.0 as price, 100 as ts
union all
- | select 8, 'E2', 55.0, 100 as ts
+ | select 7 as id, 'D2' as name, 45.0 as price, 100L as ts
union all
+ | select 8, 'E2', 55.0, 100L as ts
| ) s
| on t.id = s.id
| when not matched then insert *
- """.stripMargin)
+ """.stripMargin)
// Verify final state
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(3, "C", 30.0, 100),
- Seq(4, "D2", 45.0, 101),
- Seq(5, "E2", 55.0, 99),
- Seq(6, "F2", 65.0, 100),
- Seq(7, "D2", 45.0, 100),
- Seq(8, "E2", 55.0, 100)
- )
+ (nonDeletedRows ++ Seq(
+ Seq(3, "C", 30.0, 100),
+ Seq(4, "D2", 45.0, 101),
+ Seq(5, "E2", 55.0, 99),
+ Seq(6, "F2", 65.0, 100),
+ Seq(7, "D2", 45.0, 100),
+ Seq(8, "E2", 55.0, 100)
+ )): _*)
})
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index f0ffbc3a38a..69a5c83d6ae 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -18,16 +18,68 @@
package org.apache.spark.sql.hudi.dml
import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import
org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConfig
class TestMergeModeEventTimeOrdering extends HoodieSparkSqlTestBase {
- Seq("cow", "mor").foreach { tableType =>
- test(s"Test $tableType table with EVENT_TIME_ORDERING merge mode") {
- withSparkSqlSessionConfig(
- "hoodie.merge.small.file.group.candidates.limit" -> "0",
- DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true"
+ // TODO(HUDI-8938): add "mor,6,true", "mor,6,false" after the fix
+ Seq("cow,8,true", "cow,8,false", "cow,6,true", "cow,6,false",
+ "mor,8,true", "mor,8,false").foreach { args =>
+ val argList = args.split(',')
+ val tableType = argList(0)
+ val tableVersion = argList(1)
+ val setRecordMergeConfigs = argList(2).toBoolean
+ val storage = HoodieTestUtils.getDefaultStorage
+ val mergeConfigClause = if (setRecordMergeConfigs) {
+ if (tableVersion.toInt == 6) {
+ // Table version 6
+ s", payloadClass = '${classOf[DefaultHoodieRecordPayload].getName}'"
+ } else {
+ // Current table version (8)
+ ", hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'"
+ }
+ } else {
+ ""
+ }
+ val writeTableVersionClause = if (tableVersion.toInt == 6) {
+ s"hoodie.write.table.version = $tableVersion,"
+ } else {
+ ""
+ }
+ val expectedMergeConfigs = if (tableVersion.toInt == 6) {
+ Map(
+ HoodieTableConfig.VERSION.key -> "6",
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts"
+ )
+ } else {
+ Map(
+ HoodieTableConfig.VERSION.key -> "8",
+ HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts",
+ HoodieTableConfig.RECORD_MERGE_MODE.key -> EVENT_TIME_ORDERING.name(),
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key ->
classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key ->
EVENT_TIME_BASED_MERGE_STRATEGY_UUID)
+ }
+ val nonExistentConfigs = if (tableVersion.toInt == 6) {
+ Seq(HoodieTableConfig.RECORD_MERGE_MODE.key)
+ } else {
+ Seq()
+ }
+
+ test(s"Test $tableType table with EVENT_TIME_ORDERING
(tableVersion=$tableVersion,"
+ + s"setRecordMergeConfigs=$setRecordMergeConfigs)") {
+ withSparkSqlSessionConfigWithCondition(
+ ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+ (DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key ->
"true", true),
+ // TODO(HUDI-8820): enable MDT after supporting MDT with table version
6
+ ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
) {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
@@ -41,13 +93,16 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
| ts long
| ) using hudi
| tblproperties (
+ | $writeTableVersionClause
| type = '$tableType',
| primaryKey = 'id',
- | preCombineField = 'ts',
- | hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
+ | preCombineField = 'ts'
+ | $mergeConfigClause
| )
| location '${tmp.getCanonicalPath}'
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
// Insert initial records with ts=100
spark.sql(
@@ -67,6 +122,8 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
| select 2, 'B_equal', 70.0, 100
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
Seq(1, "A_equal", 60.0, 100),
Seq(2, "B_equal", 70.0, 100)
@@ -80,6 +137,8 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
| where id = 1
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100)
@@ -94,6 +153,8 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
| select 2, 'B', 40.0, 99
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100)
@@ -107,6 +168,8 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
| where id = 1
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
Seq(1, "A_equal", 50.0, 100),
Seq(2, "B_equal", 70.0, 100)
@@ -164,110 +227,124 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
// Delete record with no ts.
spark.sql(s"delete from $tableName where id = 1")
// Verify deletion
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
Seq(2, "B", 40.0, 101)
)
})
}
}
- }
- Seq("mor").foreach { tableType =>
- // [HUDI-8915]: COW MIT delete does not honor event time ordering. For
update we have the coverage in
- // "Test MergeInto with commit time/event time ordering coverage".
- // Seq("cow", "mor").foreach { tableType =>
- test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType
table") {
-
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" ->
"0") {
- withRecordType()(withTempDir { tmp =>
- val tableName = generateTableName
- // Create table with EVENT_TIME_ORDERING
- spark.sql(
- s"""
- | create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- | ) using hudi
- | tblproperties (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | hoodie.record.merge.mode = 'EVENT_TIME_ORDERING'
- | )
- | location '${tmp.getCanonicalPath}'
+ if ("mor".equals(tableType)) {
+ // [HUDI-8915]: COW MIT delete does not honor event time ordering. For
update we have the coverage in
+ // "Test MergeInto with commit time/event time ordering coverage".
+ // Seq("cow", "mor").foreach { tableType =>
+ test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType
table "
+ +
s"(tableVersion=$tableVersion,setRecordMergeConfigs=$setRecordMergeConfigs)") {
+ withSparkSqlSessionConfigWithCondition(
+ ("hoodie.merge.small.file.group.candidates.limit" -> "0", true),
+ // TODO(HUDI-8820): enable MDT after supporting MDT with table
version 6
+ ("hoodie.metadata.enable" -> "false", tableVersion.toInt == 6)
+ ) {
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create table with EVENT_TIME_ORDERING
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | tblproperties (
+ | $writeTableVersionClause
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | $mergeConfigClause
+ | )
+ | location '${tmp.getCanonicalPath}'
""".stripMargin)
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
- // Insert initial records with ts=100
- spark.sql(
- s"""
- | insert into $tableName
- | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts union
all
- | select 1, 'A', 10.0, 100L union all
- | select 2, 'B', 20.0, 100L union all
- | select 3, 'C', 30.0, 100L union all
- | select 4, 'D', 40.0, 100L union all
- | select 5, 'E', 50.0, 100L union all
- | select 6, 'F', 60.0, 100L
+ // Insert initial records with ts=100
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | select 0 as id, 'A0' as name, 0.0 as price, 100L as ts
union all
+ | select 1, 'A', 10.0, 100L union all
+ | select 2, 'B', 20.0, 100L union all
+ | select 3, 'C', 30.0, 100L union all
+ | select 4, 'D', 40.0, 100L union all
+ | select 5, 'E', 50.0, 100L union all
+ | select 6, 'F', 60.0, 100L
""".stripMargin)
- // Merge operation - delete with arbitrary ts value (lower, equal
and higher). Lower ts won't take effect.
- spark.sql(
- s"""
- | merge into $tableName t
- | using (
- | select 0 as id, 'B2' as name, 25.0 as price, 100L as ts
union all
- | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts
union all
- | select 2 as id, 'B2' as name, 25.0 as price, 99L as ts
- | ) s
- | on t.id = s.id
- | when matched then delete
+ // Merge operation - delete with arbitrary ts value (lower, equal
and higher). Lower ts won't take effect.
+ spark.sql(
+ s"""
+ | merge into $tableName t
+ | using (
+ | select 0 as id, 'B2' as name, 25.0 as price, 100L as ts
union all
+ | select 1 as id, 'B2' as name, 25.0 as price, 101L as ts
union all
+ | select 2 as id, 'B2' as name, 25.0 as price, 99L as ts
+ | ) s
+ | on t.id = s.id
+ | when matched then delete
""".stripMargin)
- // Merge operation - update with mixed ts values (only equal or
higher ts should take effect)
- spark.sql(
- s"""
- | merge into $tableName t
- | using (
- | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts
union all
- | select 5, 'E2', 55.0, 99L as ts union all
- | select 6, 'F2', 65.0, 100L as ts
- | ) s
- | on t.id = s.id
- | when matched then update set *
+ // Merge operation - update with mixed ts values (only equal or
higher ts should take effect)
+ spark.sql(
+ s"""
+ | merge into $tableName t
+ | using (
+ | select 4 as id, 'D2' as name, 45.0 as price, 101L as ts
union all
+ | select 5, 'E2', 55.0, 99L as ts union all
+ | select 6, 'F2', 65.0, 100L as ts
+ | ) s
+ | on t.id = s.id
+ | when matched then update set *
""".stripMargin)
- // Verify state after merges
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(2, "B", 20.0, 100),
- Seq(3, "C", 30.0, 100),
- Seq(4, "D2", 45.0, 101),
- Seq(5, "E", 50.0, 100),
- Seq(6, "F2", 65.0, 100)
- )
+ // Verify state after merges
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(2, "B", 20.0, 100),
+ Seq(3, "C", 30.0, 100),
+ Seq(4, "D2", 45.0, 101),
+ Seq(5, "E", 50.0, 100),
+ Seq(6, "F2", 65.0, 100)
+ )
- // Insert new records through merge
- spark.sql(
- s"""
- | merge into $tableName t
- | using (
- | select 7 as id, 'G' as name, 70.0 as price, 99 as ts union
all
- | select 8, 'H', 80.0, 99 as ts
- | ) s
- | on t.id = s.id
- | when not matched then insert *
+ // Insert new records through merge
+ spark.sql(
+ s"""
+ | merge into $tableName t
+ | using (
+ | select 7 as id, 'G' as name, 70.0 as price, 99 as ts
union all
+ | select 8, 'H', 80.0, 99 as ts
+ | ) s
+ | on t.id = s.id
+ | when not matched then insert *
""".stripMargin)
- checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
- Seq(2, "B", 20.0, 100),
- Seq(3, "C", 30.0, 100),
- Seq(4, "D2", 45.0, 101),
- Seq(5, "E", 50.0, 100),
- Seq(6, "F2", 65.0, 100),
- Seq(7, "G", 70.0, 99),
- Seq(8, "H", 80.0, 99)
- )
- })
+ validateTableConfig(
+ storage, tmp.getCanonicalPath, expectedMergeConfigs,
nonExistentConfigs)
+ checkAnswer(s"select id, name, price, ts from $tableName order by
id")(
+ Seq(2, "B", 20.0, 100),
+ Seq(3, "C", 30.0, 100),
+ Seq(4, "D2", 45.0, 101),
+ Seq(5, "E", 50.0, 100),
+ Seq(6, "F2", 65.0, 100),
+ Seq(7, "G", 70.0, 99),
+ Seq(8, "H", 80.0, 99)
+ )
+ })
+ }
}
}
}