Ashwanisr opened a new issue #2724: URL: https://github.com/apache/hudi/issues/2724
spark 2.4.4 runtime **Upsert function:** ``` def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = { albumDf.write .format("hudi") .option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL) .option(RECORDKEY_FIELD_OPT_KEY, key) .option(PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(TABLE_NAME, tableName) .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) .option("hoodie.upsert.shuffle.parallelism", "2") .option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS") .option("hoodie.keep.min.commits","3") .option("hoodie.keep.max.commits","4") .option("hoodie.cleaner.commits.retained","2") .option("hoodie.clean.automatic","true") .mode(Append) .save(s"$path") } ``` Using this upsert function I performed 4 upserts. Table state and files are : hudi/Album/ 0 hudi/Album/.hoodie/ 0 hudi/Album/.hoodie/.aux/ 0 hudi/Album/.hoodie/.aux/.bootstrap/ 0 hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0 hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0 hudi/Album/.hoodie/.temp/ 0 hudi/Album/.hoodie/20210325102516.commit 1563 hudi/Album/.hoodie/20210325102516.commit.requested 0 hudi/Album/.hoodie/20210325102516.inflight 976 hudi/Album/.hoodie/20210325103000.commit 1573 hudi/Album/.hoodie/20210325103000.commit.requested 0 hudi/Album/.hoodie/20210325103000.inflight 976 hudi/Album/.hoodie/20210325104112.commit 1576 hudi/Album/.hoodie/20210325104112.commit.requested 0 hudi/Album/.hoodie/20210325104112.inflight 1636 hudi/Album/.hoodie/20210325104451.clean 1473 hudi/Album/.hoodie/20210325104451.clean.inflight 1442 hudi/Album/.hoodie/20210325104451.clean.requested 1442 hudi/Album/.hoodie/20210325104451.commit 1576 hudi/Album/.hoodie/20210325104451.commit.requested 0 hudi/Album/.hoodie/20210325104451.inflight 976 hudi/Album/.hoodie/archived/ 0 hudi/Album/.hoodie/hoodie.properties 226 hudi/Album/default/ 0 hudi/Album/default/.hoodie_partition_metadata 93 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet 434836 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet 434911 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-53-50_20210325103000.parquet 434781 Table: _hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+ | 20210325102516| 20210325102516_0_1| 801| default|1fc8c6f5-8a27-421...| 801| Hail to the Thief| [2+2=5, Backdrifts]| 18231| | 20210325102516| 20210325102516_0_2| 800| default|1fc8c6f5-8a27-421...| 800| 6 String Theory|[Lay it down, Am ...| 18231| | 20210325103000| 20210325103000_0_1| 802| default|1fc8c6f5-8a27-421...| 802|Best Of Jazz Blues|[Jumpin' the blue...| 18265| | 20210325104112| 20210325104112_0_3| 803| default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...| 18231| | 20210325104451| 20210325104451_0_4| 804| default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...| 18231| +-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+ Next I perform delete operation using function ``` def delete(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = { albumDf.write .format("hudi") .option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL) .option(RECORDKEY_FIELD_OPT_KEY, key) .option(PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(TABLE_NAME, tableName) .option(OPERATION_OPT_KEY, "delete") .option("hoodie.upsert.shuffle.parallelism", "2") .option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS") .option("hoodie.keep.min.commits","3") .option("hoodie.keep.max.commits","4") .option("hoodie.cleaner.commits.retained","2") .option("hoodie.clean.automatic","true") .mode(Append) .save(s"$path") } ``` and command as given below: ``` val dataPoint = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, "0"). option(END_INSTANTTIME_OPT_KEY, "20220319000000"). load(path) // create a view of table to add sql query if required dataPoint.createOrReplaceTempView("hudi_data_point_in_time") // The entire row isn’t necessary, we only need keys val ds = spark.sql("select * from hudi_data_point_in_time where albumId = 800") ds.show() ``` hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+---------------+--------------------+----------+ | 20210325102516| 20210325102516_0_2| 800| default|1fc8c6f5-8a27-421...| 800|6 String Theory|[Lay it down, Am ...| 18231| `delete(ds,tableName, "albumId", "updateDate")` Files after the delete operation are: hudi/Album/ 0 hudi/Album/.hoodie/ 0 hudi/Album/.hoodie/.aux/ 0 hudi/Album/.hoodie/.aux/.bootstrap/ 0 hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0 hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0 hudi/Album/.hoodie/.temp/ 0 hudi/Album/.hoodie/20210325104112.commit 1576 hudi/Album/.hoodie/20210325104112.commit.requested 0 hudi/Album/.hoodie/20210325104112.inflight 1636 hudi/Album/.hoodie/20210325104451.clean 1473 hudi/Album/.hoodie/20210325104451.clean.inflight 1442 hudi/Album/.hoodie/20210325104451.clean.requested 1442 hudi/Album/.hoodie/20210325104451.commit 1576 hudi/Album/.hoodie/20210325104451.commit.requested 0 hudi/Album/.hoodie/20210325104451.inflight 976 hudi/Album/.hoodie/20210326054248.clean 1473 hudi/Album/.hoodie/20210326054248.clean.inflight 1442 hudi/Album/.hoodie/20210326054248.clean.requested 1442 hudi/Album/.hoodie/20210326054248.commit 1573 hudi/Album/.hoodie/20210326054248.commit.requested 0 hudi/Album/.hoodie/20210326054248.inflight 1636 hudi/Album/.hoodie/archived/ 0 hudi/Album/.hoodie/archived/.commits_.archive.1_1-0-1 9785 hudi/Album/.hoodie/hoodie.properties 226 hudi/Album/default/ 0 hudi/Album/default/.hoodie_partition_metadata 93 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet 434836 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet 434911 hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-32-22_20210326054248.parquet 434766 and table state is '_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+ | 20210325102516| 20210325102516_0_1| 801| default|1fc8c6f5-8a27-421...| 801| Hail to the Thief| [2+2=5, Backdrifts]| 18231| | 20210325103000| 20210325103000_0_1| 802| default|1fc8c6f5-8a27-421...| 802|Best Of Jazz Blues|[Jumpin' the blue...| 18265| | 20210325104112| 20210325104112_0_3| 803| default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...| 18231| | 20210325104451| 20210325104451_0_4| 804| default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...| 18231| +-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+ Now I run incremental query from begin time 0 to last upsert timestamp which should list down row 801,802,803 and 804 but only lists down row 803 and 804 ``` spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, "0"). option(END_INSTANTTIME_OPT_KEY, "20210325104452"). load(path). show() ``` _hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId|title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+ | 20210325104112| 20210325104112_0_3| 803| default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...| 18231| | 20210325104451| 20210325104451_0_4| 804| default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...| 18231| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+ Is there any reason why row 801 and 802 are skipped while performing incremental query? If instead of delete I try another upsert there then it is listing those rows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org