nsivabalan commented on code in PR #6615: URL: https://github.com/apache/hudi/pull/6615#discussion_r965133110
########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); Review Comment: insertDf ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0; incrementalQuery(spark, tablePath, tableName); pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); + Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); + Dataset<Row> deleteQueryIn = delete(spark, tablePath, tableName); Review Comment: deletedDf ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0; incrementalQuery(spark, tablePath, tableName); pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); + Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); + Dataset<Row> deleteQueryIn = delete(spark, tablePath, tableName); queryData(spark, jsc, tablePath, tableName, dataGen); + assert beforeDelete.except(deleteQueryIn).except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> beforeOverwrite = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); + Dataset<Row> overwriteDataIn = insertOverwriteData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> afterOverwrite = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); + Dataset<Row> overwriteIntersect = beforeOverwrite.intersect(afterOverwrite); + assert afterOverwrite.except(overwriteIntersect).except(overwriteDataIn).count() == 0; queryData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> beforeDeleteByPartition = spark.sql( + "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table WHERE partitionpath NOT IN (" + + String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS) + ")"); deleteByPartition(spark, tablePath, tableName); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(beforeDeleteByPartition).count() == 0; Review Comment: lets ensure we don't delete all partition paths, but just 1 or 2. ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0; incrementalQuery(spark, tablePath, tableName); pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); + Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); Review Comment: snapshotBeforeDeleteDf ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -171,16 +185,19 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t .option(TBL_NAME.key(), tableName) .mode(Append) .save(tablePath); + return df; } /** * Deleta data based in data information. */ - public static void delete(SparkSession spark, String tablePath, String tableName) { + public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) { Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*"); roViewDF.createOrReplaceTempView("hudi_ro_table"); - Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); + //Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); + Dataset<Row> ret = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2"); Review Comment: minor. rename `ret` -> `toBeDeletedDf` ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -171,16 +185,19 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t .option(TBL_NAME.key(), tableName) .mode(Append) .save(tablePath); + return df; } /** * Deleta data based in data information. */ - public static void delete(SparkSession spark, String tablePath, String tableName) { + public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) { Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*"); roViewDF.createOrReplaceTempView("hudi_ro_table"); - Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); + //Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2"); Review Comment: remove uncommented code. ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0; incrementalQuery(spark, tablePath, tableName); pointInTimeQuery(spark, tablePath, tableName); - delete(spark, tablePath, tableName); + Dataset<Row> beforeDelete = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table"); Review Comment: if repeating, declare a constant and re-use `SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table` ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); Review Comment: updateDf ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; - updateData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> updateQueryDataIn = updateData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table").except(insertQueryDataIn).except(updateQueryDataIn).count() == 0; Review Comment: this might need some fixes. consider all error paths. ########## hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java: ########## @@ -65,30 +66,42 @@ public static void main(String[] args) { public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) { final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>(); - insertData(spark, jsc, tablePath, tableName, dataGen); + Dataset<Row> insertQueryDataIn = insertData(spark, jsc, tablePath, tableName, dataGen); queryData(spark, jsc, tablePath, tableName, dataGen); + assert insertQueryDataIn.except(spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table")).count() == 0; Review Comment: probably you can separate out into two lines. val hudiDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table") assert insertDf.except(hudiDf).count == 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org