yihua commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1908029362
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
}
+ private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row],
timeTravelDfRows: Array[Row]): Unit = {
Review Comment:
```suggestion
private def compareEntireInputRowsWithHudiRows(expectedRows: Array[Row],
actualRows: Array[Row]): Unit = {
```
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Create hashmaps for O(1) lookups
+ val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+ // Check that all input rows exist in hudiRows
+ inputRows.foreach { inputRow =>
+ val key = getRowKey(inputRow)
+ val hudiRow = hudiRows.find(row => getRowKey(row) == key)
+ assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+ s"Input row with _row_key: $key not found in Hudi rows or content
mismatch")
+ }
+
+ // Check that each hudi row either exists in input or before
+ hudiRows.foreach { hudiRow =>
+ val key = getRowKey(hudiRow)
+ val foundInInput = inputRowMap.get(key).exists(row => rowsEqual(hudiRow,
row))
+ val foundInBefore = !foundInInput && beforeRowMap.get(key).exists(row =>
rowsEqual(hudiRow, row))
+
+ assertTrue(foundInInput || foundInBefore,
+ s"Hudi row with _row_key: $key not found in either input or before
rows")
+ }
+ }
+
+ // Helper function to check if two rows are equal (comparing only the
columns we care about)
+ def rowsEqual(row1: Row, row2: Row): Boolean = {
+ // Get schemas from rows
+ val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+ val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+ // Verify schemas are identical
+ if (schema1 != schema2) {
+ throw new AssertionError(
+ s"""Schemas are different:
+ |Schema 1: ${schema1.treeString}
+ |Schema 2: ${schema2.treeString}""".stripMargin)
+ }
+
+ // Compare all fields using schema
+ schema1.fields.forall { field =>
+ val idx1 = row1.fieldIndex(field.name)
+ val idx2 = row2.fieldIndex(field.name)
+ row1.get(idx1) == row2.get(idx2)
+ }
+ }
+
+ def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows:
Array[Row], beforeRows: Array[Row]): Unit = {
Review Comment:
Could you name them properly based on how they are used?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Create hashmaps for O(1) lookups
+ val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+ // Check that all input rows exist in hudiRows
+ inputRows.foreach { inputRow =>
+ val key = getRowKey(inputRow)
+ val hudiRow = hudiRows.find(row => getRowKey(row) == key)
+ assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+ s"Input row with _row_key: $key not found in Hudi rows or content
mismatch")
+ }
+
+ // Check that each hudi row either exists in input or before
+ hudiRows.foreach { hudiRow =>
+ val key = getRowKey(hudiRow)
+ val foundInInput = inputRowMap.get(key).exists(row => rowsEqual(hudiRow,
row))
+ val foundInBefore = !foundInInput && beforeRowMap.get(key).exists(row =>
rowsEqual(hudiRow, row))
+
+ assertTrue(foundInInput || foundInBefore,
+ s"Hudi row with _row_key: $key not found in either input or before
rows")
+ }
+ }
+
+ // Helper function to check if two rows are equal (comparing only the
columns we care about)
+ def rowsEqual(row1: Row, row2: Row): Boolean = {
+ // Get schemas from rows
+ val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+ val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+ // Verify schemas are identical
+ if (schema1 != schema2) {
+ throw new AssertionError(
+ s"""Schemas are different:
+ |Schema 1: ${schema1.treeString}
+ |Schema 2: ${schema2.treeString}""".stripMargin)
+ }
+
+ // Compare all fields using schema
+ schema1.fields.forall { field =>
+ val idx1 = row1.fieldIndex(field.name)
+ val idx2 = row2.fieldIndex(field.name)
+ row1.get(idx1) == row2.get(idx2)
+ }
+ }
+
+ def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows:
Array[Row], beforeRows: Array[Row]): Unit = {
Review Comment:
```suggestion
def compareUpdateRowsWithHudiRows(expectedRows: Array[Row],
actualUpdateRows: Array[Row], actualRows: Array[Row]): Unit = {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]