lokeshj1703 commented on code in PR #9760:
URL: https://github.com/apache/hudi/pull/9760#discussion_r1334140111
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala:
##########
@@ -1236,4 +1238,98 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
})
}
}
+
+ test("Test MergeInto With Record Index and Data Skipping") {
+ Seq(true, false).foreach { sparkSqlOptimizedWrites =>
+ withRecordType()(withTempDir { tmp =>
+ spark.sql("set hoodie.payload.combined.schema.validate = false")
+ val tableName = generateTableName
+
+ val cls = classOf[HoodieFileIndex]
+ val appender = LoggingTestUtils.attachInMemoryAppender(cls)
+ var msg = "Total file slices: 1; candidate file slices after data
skipping: 0; skipping percentage 1.0"
+
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts',
+ | 'hoodie.metadata.enable' = 'true',
+ | 'hoodie.metadata.record.index.enable' = 'true',
+ | 'hoodie.enable.data.skipping' = 'true'
+ | )
+ """.stripMargin)
+
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+ spark.sql("set hoodie.metadata.enable = true")
+ spark.sql("set hoodie.metadata.record.index.enable = true")
+ spark.sql("set hoodie.enable.data.skipping = true")
+ spark.sql("set hoodie.parquet.small.file.limit = 0")
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (2, 'a2', 10, 1000),
+ | (3, 'a3', 10, 1000)
+ """.stripMargin)
+
+ assertEquals(0, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
+ appender.clear()
+
+ // First merge with a extra input field 'flag' (insert a new record)
+ spark.sql(
+ s"""
+ | merge into $tableName
+ | using (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as
flag
+ | ) s0
+ | on s0.id = $tableName.id
+ | when matched and flag = '1' then update set
+ | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+ | when not matched and flag = '1' then insert *
+ |
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000)
+ )
+
+ assertEquals(1, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
+ msg = "Total file slices: 2; candidate file slices after data
skipping: 1; skipping percentage 0.5"
+ assertEquals(0, LoggingTestUtils.getMatchingLogEvents(msg,
appender).count())
Review Comment:
The physical plan doesn't have much info. For example this is a sample here:
```
CommandResult <empty>
+- Execute MergeIntoHoodieTableCommand
+- MergeIntoHoodieTableCommand MergeIntoTable (id#271 = id#258),
[updateaction(None, assignment(id#258, id#271), assignment(name#259, name#272),
assignment(price#260, (cast(price#273 as double) + price#260)),
assignment(ts#261L, ansi_cast(ts#274 as bigint)))], [insertaction(None,
assignment(id#258, id#271), assignment(name#259, name#272),
assignment(price#260, ansi_cast(price#273 as double)), assignment(ts#261L,
ansi_cast(ts#274 as bigint)))]
```
Even with select queries we had a similar issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]