yihua commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1908008572


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(10, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
-    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)

Review Comment:
   ```suggestion
       val commitInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
   ```



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(10, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
-    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime3 = commitCompletedInstant3.getCompletionTime
     assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
 
-    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf3.count())
-    compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
-    snapshotDf3.unpersist(true)
+    val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(10, snapshotDf3Rows.length)
+    compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+      snapshotDf3Rows, snapshotDf3Rows)
+    inputDf2.unpersist(true)
 
     // Read Incremental Query, uses hudi_table_changes() table valued function 
for spark sql
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
     //HUDI-5266
-    val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
+    val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath, 
"000").get(0)
+    val firstCommit = firstCommitInstant.getCompletionTime

Review Comment:
   ```suggestion
       val firstCommitCompletionTime = firstCommitInstant.getCompletionTime
   ```



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(10, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
-    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime3 = commitCompletedInstant3.getCompletionTime
     assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
 
-    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf3.count())
-    compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
-    snapshotDf3.unpersist(true)
+    val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(10, snapshotDf3Rows.length)
+    compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+      snapshotDf3Rows, snapshotDf3Rows)
+    inputDf2.unpersist(true)
 
     // Read Incremental Query, uses hudi_table_changes() table valued function 
for spark sql
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
     //HUDI-5266
-    val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
+    val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath, 
"000").get(0)
+    val firstCommit = firstCommitInstant.getCompletionTime

Review Comment:
   Let's make sure all variables have consistent naming.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], 
beforeRows: Array[Row]): Unit = {

Review Comment:
   Does this achieves almost the same functionality as 
`compareUpdateRowsWithHudiRows`?  Could we keep one of them only?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], 
beforeRows: Array[Row]): Unit = {
+    // Helper function to get _row_key from a Row
+    def getRowKey(row: Row): String = row.getAs[String]("_row_key")
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+    // Create hashmaps for O(1) lookups
+    val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+    val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+    // Check that all input rows exist in hudiRows
+    inputRows.foreach { inputRow =>

Review Comment:
   Could we just sort the list by record key and do row comparison?  Will that 
code be easier to understand?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to