ssandona opened a new issue, #9870:
URL: https://github.com/apache/hudi/issues/9870

   I see during upsert operations duplicate data. This seems to be related to 
range pruning not properly using column statistics as
   by disabling `hoodie.bloom.index.use.metadata` before doing the upserts, the 
issue is solved.
   
   I'm using **Hudi 0.13.1**.
   
   I'm not able to replicate with a small dataset but with my dataset the issue 
is persistent and can be replicated.
   
   Here the observed behavior:
   
   1. Initial dataset with Partition1 (bulk inserted): 229874354 records
   2. Perform an Upsert operation using a dataframe containing 9640013 records, 
2223979 updates related to partition 1 and 7416034 inserts related to partition 
2
   - Here I expect to end up with 229874354 records in Partition 1 and 7416034 
records in partition 2, instead I end up with 232098333 records in partition 1 
and 7416034 in partition 2. 
   - This means that the updates were treated as inserts
   
   Here my upsert code:
   
   ```
   COW_TABLE_NAME="hudi_drones_cow_table_opt"
   PARTITION_FIELD = "year,month"
   PRECOMBINE_FIELD = "timestamp"
   COW_TABLE_LOCATION="s3://mybucket/datasets/hudi_drones_cow_table_opt/"
   
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": 
"id,timestamp,current_height"
   }
   
   #Create upsert_df
   
   (upsert_df.write.format("org.apache.hudi")
               .option("hoodie.datasource.write.operation", "upsert")
               .options(**hudi_options_opt)
               .mode("append")
               .save(COW_TABLE_LOCATION))
   ```
   
   To understand what was causing the issues I tried with multiple options.
   
   ## OptionA: column stats in meta table + bloom filter in meta table
   
   ```
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": 
"id,timestamp,current_height"
   }
   ```
   
   This ended up with duplicates.
   
   ## OptionB: column stats in meta table + NO bloom filter in meta table
   
   ```
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "false",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": 
"id,timestamp,current_height"
   }
   ```
   
   This ended up with duplicates.
   
   ## OptionC: column stats for all columns in meta table + NO bloom filter in 
meta table
   
   ```
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "false",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
   }
   ```
   
   This ended up with duplicates.
   
   ## OptionD: NO column stats in meta table + bloom filter in meta table
   
   ```
   hudi_options_opt = {
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.index.type": "BLOOM",
       "hoodie.datasource.write.recordkey.field": "id", 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.hive_sync.enable": "true", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode": "hms", 
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "false"
   }
   ```
   
   This ended up with no duplicates, everything ok. BUT from the execution in 
the Spark UI it is not clear if it actually used the bloom filter from the 
metatable.
   
   
[Here](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L134C1-L134C1
   ) the code where `findMatchingFilesForRecordKeys` is invoked 
   
   I see [this 
point](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L175
   ) on the code is reached 
   
   I do not see any stage for the calculation of `keyLookupResultRDD` . In 
specific I would have expected to see a stage related to [this 
code](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L152)
 as [this 
check](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L101)
 with the above configurations should pass
   
   Here the stages on the SparkUI related to this Option D:
   
   ![Screenshot 2023-10-16 at 10 47 
38](https://github.com/apache/hudi/assets/5663683/2dc5cdf6-4db3-41a6-8816-bf5bba18c49a)
   
   ## Summary
   
   It seems Range Pruning is not working properly with column statistics as 
upsert operations end up with duplicate data.
   Is there an easy way to find out if the issue is related to statistics 
wrongly calculated or to the code not using them properly? It would be great if 
there was an easy way to print out for each file which column statistics are 
present in the metatable.
   
   These 2 other issues may be related:
   - https://github.com/apache/hudi/issues/9857
   - https://github.com/apache/hudi/issues/9271
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to