yihua commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1913962593


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,31 +325,67 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  // Helper function to check if two rows are equal (comparing only the 
columns we care about)
+  def rowsEqual(row1: Row, row2: Row): Boolean = {
+    // Get schemas from rows
+    val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+    val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+    // Verify schemas are identical
+    if (schema1 != schema2) {
+      throw new AssertionError(
+        s"""Schemas are different:
+            |Schema 1: ${schema1.treeString}
+            |Schema 2: ${schema2.treeString}""".stripMargin)
+    }
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+    // Compare all fields using schema
+    schema1.fields.forall { field =>
+      val idx1 = row1.fieldIndex(field.name)
+      val idx2 = row2.fieldIndex(field.name)
+      row1.get(idx1) == row2.get(idx2)
+    }
   }
 
-  def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf: 
Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
+  // Verify beforeRows + deltaRows = afterRows
+  // Make sure rows in [[afterRows]] are presented in either [[deltaRows]] or 
[[beforeRows]]
+  def compareUpdateRowsWithHudiRows(deltaRows: Array[Row], afterRows: 
Array[Row], beforeRows: Array[Row]): Unit = {
+    // Helper function to get _row_key from a Row
+    def getRowKey(row: Row): String = row.getAs[String]("_row_key")
+
+    // Create hashmaps for O(1) lookups
+    val deltaRowsMap = deltaRows.map(row => getRowKey(row) -> row).toMap
+    val beforeRowsMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+    // Ensure no duplicated record keys.
+    assertEquals(deltaRowsMap.size, deltaRows.length)
+    assertEquals(beforeRowsMap.size, beforeRows.length)
+
+    // Check that all input rows exist in hudiRows

Review Comment:
   ```suggestion
       // Check that all input rows exist in afterRows
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to