codope commented on code in PR #9760:
URL: https://github.com/apache/hudi/pull/9760#discussion_r1332933790
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -1236,4 +1238,98 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
}
+
+ test("Test MergeInto With Record Index and Data Skipping") {
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = false")
+ val tableName = generateTableName
+
+ val cls = classOf[HoodieFileIndex]
+ val appender = LoggingTestUtils.attachInMemoryAppender(cls)
+ var msg = "Total file slices: 1; candidate file slices after data
skipping: 0; skipping percentage 1.0"
+
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts',
+ | 'hoodie.metadata.enable' = 'true',
+ | 'hoodie.metadata.record.index.enable' = 'true',
+ | 'hoodie.enable.data.skipping' = 'true'
+ | )
+ """.stripMargin)
+
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.record.index.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql("set hoodie.parquet.small.file.limit = 0")
Review Comment:
you're setting this so that each insert goes to different file?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -1236,4 +1238,98 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
}
+
+ test("Test MergeInto With Record Index and Data Skipping") {
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = false")
+ val tableName = generateTableName
+
+ val cls = classOf[HoodieFileIndex]
+ val appender = LoggingTestUtils.attachInMemoryAppender(cls)
+ var msg = "Total file slices: 1; candidate file slices after data
skipping: 0; skipping percentage 1.0"
+
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts',
+ | 'hoodie.metadata.enable' = 'true',
+ | 'hoodie.metadata.record.index.enable' = 'true',
+ | 'hoodie.enable.data.skipping' = 'true'
+ | )
+ """.stripMargin)
+
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.record.index.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql("set hoodie.parquet.small.file.limit = 0")
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (2, 'a2', 10, 1000),
+ | (3, 'a3', 10, 1000)
+ """.stripMargin)
+
+ assertEquals(0, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
+ appender.clear()
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
+ |
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ assertEquals(1, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
+ msg = "Total file slices: 2; candidate file slices after data
skipping: 1; skipping percentage 0.5"
+ assertEquals(0, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
Review Comment:
Is there a way to validate the plan itself? For e.g. we validate partition
pruning in the plan here
https://github.com/apache/hudi/blob/c1062ff75be2ea9f6febc64ca3865e84c7e6c66d/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala#L1055-L1058
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]