Davis-Zhang-Onehouse commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1908061113
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -30,16 +30,15 @@ import
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
-
+import org.apache.hudi.DataSourceReadOptions
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage,
String basePath) {
return timeline.lastInstant().get().requestedTime();
}
+ /**
+ * Returns the last successful write operation's completed instant.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem
fs, String basePath) {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
Review Comment:
I can do that, it will requires refactoring other consumers as well
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Create hashmaps for O(1) lookups
+ val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+ // Check that all input rows exist in hudiRows
+ inputRows.foreach { inputRow =>
+ val key = getRowKey(inputRow)
+ val hudiRow = hudiRows.find(row => getRowKey(row) == key)
+ assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+ s"Input row with _row_key: $key not found in Hudi rows or content
mismatch")
+ }
+
+ // Check that each hudi row either exists in input or before
+ hudiRows.foreach { hudiRow =>
+ val key = getRowKey(hudiRow)
+ val foundInInput = inputRowMap.get(key).exists(row => rowsEqual(hudiRow,
row))
+ val foundInBefore = !foundInInput && beforeRowMap.get(key).exists(row =>
rowsEqual(hudiRow, row))
+
+ assertTrue(foundInInput || foundInBefore,
+ s"Hudi row with _row_key: $key not found in either input or before
rows")
+ }
+ }
+
+ // Helper function to check if two rows are equal (comparing only the
columns we care about)
+ def rowsEqual(row1: Row, row2: Row): Boolean = {
+ // Get schemas from rows
+ val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+ val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+ // Verify schemas are identical
+ if (schema1 != schema2) {
+ throw new AssertionError(
+ s"""Schemas are different:
+ |Schema 1: ${schema1.treeString}
+ |Schema 2: ${schema2.treeString}""".stripMargin)
+ }
+
+ // Compare all fields using schema
+ schema1.fields.forall { field =>
+ val idx1 = row1.fieldIndex(field.name)
+ val idx2 = row2.fieldIndex(field.name)
+ row1.get(idx1) == row2.get(idx2)
+ }
+ }
+
+ def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows:
Array[Row], beforeRows: Array[Row]): Unit = {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
//Bulk insert first set of records
- val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+ val inputDf0 = generateInserts(dataGen, "000", 10).cache()
insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT,
isMetadataEnabled, 1)
+ val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+ inputDf0.unpersist(true)
assertTrue(hasNewCommits(fs, tableBasePath, "000"))
//Verify bulk insert works correctly
- val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf1.count())
- compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
- snapshotDf1.unpersist(true)
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf1Rows =
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+ assertEquals(10, snapshotDf1.count())
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
//Test updated records
- val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+ val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled,
2)
- val commitInstantTime2 = latestCommit(fs, tableBasePath)
- val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf2.count())
- compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
- snapshotDf2.unpersist(true)
+ val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs,
tableBasePath)
+ val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+ val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf2Rows =
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+ assertEquals(10, snapshotDf2Rows.length)
+ compareUpdateRowsWithHudiRows(
+ canonicalizeDF(updateDf).collect(),
+ snapshotDf2Rows,
+ snapshotDf1Rows)
+ updateDf.unpersist(true)
- val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+ val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled,
3)
- val commitInstantTime3 = latestCommit(fs, tableBasePath)
+ val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs,
tableBasePath)
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage,
String basePath) {
return timeline.lastInstant().get().requestedTime();
}
+ /**
+ * Returns the last successful write operation's completed instant.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem
fs, String basePath) {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Create hashmaps for O(1) lookups
+ val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+ // Check that all input rows exist in hudiRows
+ inputRows.foreach { inputRow =>
Review Comment:
I was thinking the same but it is not the case:
we need to keep 3 idx for the 3 arrays, search in both inputRows and
beforeRows for each row in hudiRows. Also need to handle various cases where
the key cannot be found in total it leads to ~100 lines of code.
I can do that if required, the current one is the most concise one (but not
the most efficient one since we are just handling couple of hundred rows)
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
//Bulk insert first set of records
- val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+ val inputDf0 = generateInserts(dataGen, "000", 10).cache()
insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT,
isMetadataEnabled, 1)
+ val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+ inputDf0.unpersist(true)
assertTrue(hasNewCommits(fs, tableBasePath, "000"))
//Verify bulk insert works correctly
- val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf1.count())
- compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
- snapshotDf1.unpersist(true)
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf1Rows =
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+ assertEquals(10, snapshotDf1.count())
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
//Test updated records
- val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+ val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled,
2)
- val commitInstantTime2 = latestCommit(fs, tableBasePath)
- val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf2.count())
- compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
- snapshotDf2.unpersist(true)
+ val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs,
tableBasePath)
Review Comment:
done for all
##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage,
String basePath) {
return timeline.lastInstant().get().requestedTime();
}
+ /**
+ * Returns the last successful write operation's completed instant.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem
fs, String basePath) {
+ HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
+ return timeline.lastInstant().get();
+ }
+
+ public static HoodieInstant
latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
//Bulk insert first set of records
- val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+ val inputDf0 = generateInserts(dataGen, "000", 10).cache()
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -76,14 +76,22 @@ public static List<String> listCommitsSince(HoodieStorage
storage, String basePa
// this is used in the integration test script:
docker/demo/sparksql-incremental.commands
public static List<String> listCompletionTimeSince(FileSystem fs, String
basePath,
- String instantTimestamp) {
+ String instantTimestamp) {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
//Bulk insert first set of records
- val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+ val inputDf0 = generateInserts(dataGen, "000", 10).cache()
insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT,
isMetadataEnabled, 1)
+ val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+ inputDf0.unpersist(true)
assertTrue(hasNewCommits(fs, tableBasePath, "000"))
//Verify bulk insert works correctly
- val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf1.count())
- compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
- snapshotDf1.unpersist(true)
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf1Rows =
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+ assertEquals(10, snapshotDf1.count())
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
//Test updated records
- val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+ val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled,
2)
- val commitInstantTime2 = latestCommit(fs, tableBasePath)
- val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf2.count())
- compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
- snapshotDf2.unpersist(true)
+ val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs,
tableBasePath)
+ val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+ val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf2Rows =
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+ assertEquals(10, snapshotDf2Rows.length)
+ compareUpdateRowsWithHudiRows(
+ canonicalizeDF(updateDf).collect(),
+ snapshotDf2Rows,
+ snapshotDf1Rows)
+ updateDf.unpersist(true)
- val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+ val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled,
3)
- val commitInstantTime3 = latestCommit(fs, tableBasePath)
+ val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs,
tableBasePath)
+ val commitCompletionTime3 = commitCompletedInstant3.getCompletionTime
assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
- val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf3.count())
- compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
- snapshotDf3.unpersist(true)
+ val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ assertEquals(10, snapshotDf3Rows.length)
+ compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+ snapshotDf3Rows, snapshotDf3Rows)
+ inputDf2.unpersist(true)
// Read Incremental Query, uses hudi_table_changes() table valued function
for spark sql
// we have 2 commits, try pulling the first commit (which is not the
latest)
//HUDI-5266
- val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
+ val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath,
"000").get(0)
+ val firstCommit = firstCommitInstant.getCompletionTime
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
}
+ private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row],
timeTravelDfRows: Array[Row]): Unit = {
Review Comment:
done
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
}
+ private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row],
timeTravelDfRows: Array[Row]): Unit = {
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]