Ashwanisr opened a new issue #2724:
URL: https://github.com/apache/hudi/issues/2724


   spark 2.4.4 runtime 
   **Upsert function:**
   
   ```
   def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: 
String) = {
       albumDf.write
         .format("hudi")
         .option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
         .option(RECORDKEY_FIELD_OPT_KEY, key)
         .option(PRECOMBINE_FIELD_OPT_KEY, combineKey)
         .option(TABLE_NAME, tableName)
         .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
         .option("hoodie.keep.min.commits","3")
         .option("hoodie.keep.max.commits","4")
         .option("hoodie.cleaner.commits.retained","2")
         .option("hoodie.clean.automatic","true")
         .mode(Append)
         .save(s"$path")
     }
   ```
   
   Using this upsert function I performed 4 upserts. Table state and files are :
   hudi/Album/ 0
   hudi/Album/.hoodie/ 0
   hudi/Album/.hoodie/.aux/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0
   hudi/Album/.hoodie/.temp/ 0
   hudi/Album/.hoodie/20210325102516.commit 1563
   hudi/Album/.hoodie/20210325102516.commit.requested 0
   hudi/Album/.hoodie/20210325102516.inflight 976
   hudi/Album/.hoodie/20210325103000.commit 1573
   hudi/Album/.hoodie/20210325103000.commit.requested 0
   hudi/Album/.hoodie/20210325103000.inflight 976
   hudi/Album/.hoodie/20210325104112.commit 1576
   hudi/Album/.hoodie/20210325104112.commit.requested 0
   hudi/Album/.hoodie/20210325104112.inflight 1636
   hudi/Album/.hoodie/20210325104451.clean 1473
   hudi/Album/.hoodie/20210325104451.clean.inflight 1442
   hudi/Album/.hoodie/20210325104451.clean.requested 1442
   hudi/Album/.hoodie/20210325104451.commit 1576
   hudi/Album/.hoodie/20210325104451.commit.requested 0
   hudi/Album/.hoodie/20210325104451.inflight 976
   hudi/Album/.hoodie/archived/ 0
   hudi/Album/.hoodie/hoodie.properties 226
   hudi/Album/default/ 0
   hudi/Album/default/.hoodie_partition_metadata 93
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet
 434836
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet
 434911
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-53-50_20210325103000.parquet
 434781
   
   Table:
   
_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|albumId|             title|              tracks|updateDate|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
   |     20210325102516|  20210325102516_0_1|               801|               
default|1fc8c6f5-8a27-421...|    801| Hail to the Thief| [2+2=5, Backdrifts]|   
  18231|
   |     20210325102516|  20210325102516_0_2|               800|               
default|1fc8c6f5-8a27-421...|    800|   6 String Theory|[Lay it down, Am ...|   
  18231|
   |     20210325103000|  20210325103000_0_1|               802|               
default|1fc8c6f5-8a27-421...|    802|Best Of Jazz Blues|[Jumpin' the blue...|   
  18265|
   |     20210325104112|  20210325104112_0_3|               803|               
default|1fc8c6f5-8a27-421...|    803|                 7|[Lay it down, Am ...|   
  18231|
   |     20210325104451|  20210325104451_0_4|               804|               
default|1fc8c6f5-8a27-421...|    804|                 7|[Lay it down, Am ...|   
  18231|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
   
   Next I perform delete operation using function
   ```
   def delete(albumDf: DataFrame, tableName: String, key: String, combineKey: 
String) = {
       albumDf.write
         .format("hudi")
         .option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
         .option(RECORDKEY_FIELD_OPT_KEY, key)
         .option(PRECOMBINE_FIELD_OPT_KEY, combineKey)
         .option(TABLE_NAME, tableName)
         .option(OPERATION_OPT_KEY, "delete")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
         .option("hoodie.keep.min.commits","3")
         .option("hoodie.keep.max.commits","4")
         .option("hoodie.cleaner.commits.retained","2")
         .option("hoodie.clean.automatic","true")
         .mode(Append)
         .save(s"$path")
     }
   ```
   
   and command as given below:
   ```
   val dataPoint = spark.read.format("hudi").
     option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
     option(BEGIN_INSTANTTIME_OPT_KEY, "0").
     option(END_INSTANTTIME_OPT_KEY, "20220319000000").
     load(path)
   
   // create a view of table to add sql query if required
   dataPoint.createOrReplaceTempView("hudi_data_point_in_time")
   // The entire row isn’t necessary, we only need keys
   val ds = spark.sql("select * from hudi_data_point_in_time where albumId = 
800")
   ds.show()
   ```
   
   
hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|albumId|          title|              tracks|updateDate|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+---------------+--------------------+----------+
   |     20210325102516|  20210325102516_0_2|               800|               
default|1fc8c6f5-8a27-421...|    800|6 String Theory|[Lay it down, Am ...|     
18231|
   
   `delete(ds,tableName, "albumId", "updateDate")`
   
   
   Files after the delete operation are:
   
   hudi/Album/ 0
   hudi/Album/.hoodie/ 0
   hudi/Album/.hoodie/.aux/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0
   hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0
   hudi/Album/.hoodie/.temp/ 0
   hudi/Album/.hoodie/20210325104112.commit 1576
   hudi/Album/.hoodie/20210325104112.commit.requested 0
   hudi/Album/.hoodie/20210325104112.inflight 1636
   hudi/Album/.hoodie/20210325104451.clean 1473
   hudi/Album/.hoodie/20210325104451.clean.inflight 1442
   hudi/Album/.hoodie/20210325104451.clean.requested 1442
   hudi/Album/.hoodie/20210325104451.commit 1576
   hudi/Album/.hoodie/20210325104451.commit.requested 0
   hudi/Album/.hoodie/20210325104451.inflight 976
   hudi/Album/.hoodie/20210326054248.clean 1473
   hudi/Album/.hoodie/20210326054248.clean.inflight 1442
   hudi/Album/.hoodie/20210326054248.clean.requested 1442
   hudi/Album/.hoodie/20210326054248.commit 1573
   hudi/Album/.hoodie/20210326054248.commit.requested 0
   hudi/Album/.hoodie/20210326054248.inflight 1636
   hudi/Album/.hoodie/archived/ 0
   hudi/Album/.hoodie/archived/.commits_.archive.1_1-0-1 9785
   hudi/Album/.hoodie/hoodie.properties 226
   hudi/Album/default/ 0
   hudi/Album/default/.hoodie_partition_metadata 93
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet
 434836
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet
 434911
   
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-32-22_20210326054248.parquet
 434766
   
   and table state is 
   
   
'_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|albumId|             title|              tracks|updateDate|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
   |     20210325102516|  20210325102516_0_1|               801|               
default|1fc8c6f5-8a27-421...|    801| Hail to the Thief| [2+2=5, Backdrifts]|   
  18231|
   |     20210325103000|  20210325103000_0_1|               802|               
default|1fc8c6f5-8a27-421...|    802|Best Of Jazz Blues|[Jumpin' the blue...|   
  18265|
   |     20210325104112|  20210325104112_0_3|               803|               
default|1fc8c6f5-8a27-421...|    803|                 7|[Lay it down, Am ...|   
  18231|
   |     20210325104451|  20210325104451_0_4|               804|               
default|1fc8c6f5-8a27-421...|    804|                 7|[Lay it down, Am ...|   
  18231|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
   
   
   Now I run incremental query from begin time 0 to last upsert timestamp which 
should list down row 801,802,803 and 804 but only lists down row 803 and 804
   
   ```
   spark.read.format("hudi").
   option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
   option(BEGIN_INSTANTTIME_OPT_KEY, "0").
   option(END_INSTANTTIME_OPT_KEY, "20210325104452").
   load(path).
   show()
   ```
   
   
_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|albumId|title|              tracks|updateDate|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+
   |     20210325104112|  20210325104112_0_3|               803|               
default|1fc8c6f5-8a27-421...|    803|    7|[Lay it down, Am ...|     18231|
   |     20210325104451|  20210325104451_0_4|               804|               
default|1fc8c6f5-8a27-421...|    804|    7|[Lay it down, Am ...|     18231|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+
   
   
   Is there any reason why row 801 and 802 are skipped while performing 
incremental query? If instead of delete I try another upsert there then it is 
listing those rows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to