ssandona opened a new issue, #9870: URL: https://github.com/apache/hudi/issues/9870
I see during upsert operations duplicate data. This seems to be related to range pruning not properly using column statistics as by disabling `hoodie.bloom.index.use.metadata` before doing the upserts, the issue is solved. I'm using **Hudi 0.13.1**. I'm not able to replicate with a small dataset but with my dataset the issue is persistent and can be replicated. Here the observed behavior: 1. Initial dataset with Partition1 (bulk inserted): 229874354 records 2. Perform an Upsert operation using a dataframe containing 9640013 records, 2223979 updates related to partition 1 and 7416034 inserts related to partition 2 - Here I expect to end up with 229874354 records in Partition 1 and 7416034 records in partition 2, instead I end up with 232098333 records in partition 1 and 7416034 in partition 2. - This means that the updates were treated as inserts Here my upsert code: ``` COW_TABLE_NAME="hudi_drones_cow_table_opt" PARTITION_FIELD = "year,month" PRECOMBINE_FIELD = "timestamp" COW_TABLE_LOCATION="s3://mybucket/datasets/hudi_drones_cow_table_opt/" hudi_options_opt = { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.index.column.stats.column.list": "id,timestamp,current_height" } #Create upsert_df (upsert_df.write.format("org.apache.hudi") .option("hoodie.datasource.write.operation", "upsert") .options(**hudi_options_opt) .mode("append") .save(COW_TABLE_LOCATION)) ``` To understand what was causing the issues I tried with multiple options. ## OptionA: column stats in meta table + bloom filter in meta table ``` hudi_options_opt = { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.index.column.stats.column.list": "id,timestamp,current_height" } ``` This ended up with duplicates. ## OptionB: column stats in meta table + NO bloom filter in meta table ``` hudi_options_opt = { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "false", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", "hoodie.metadata.index.column.stats.column.list": "id,timestamp,current_height" } ``` This ended up with duplicates. ## OptionC: column stats for all columns in meta table + NO bloom filter in meta table ``` hudi_options_opt = { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "false", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "true", } ``` This ended up with duplicates. ## OptionD: NO column stats in meta table + bloom filter in meta table ``` hudi_options_opt = { "hoodie.table.name": COW_TABLE_NAME, "hoodie.table.type": "COPY_ON_WRITE", "hoodie.index.type": "BLOOM", "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.metadata.enable": "true", "hoodie.metadata.index.bloom.filter.enable": "true", "hoodie.bloom.index.use.metadata": "true", "hoodie.metadata.index.column.stats.enable": "false" } ``` This ended up with no duplicates, everything ok. BUT from the execution in the Spark UI it is not clear if it actually used the bloom filter from the metatable. [Here](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L134C1-L134C1 ) the code where `findMatchingFilesForRecordKeys` is invoked I see [this point](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L175 ) on the code is reached I do not see any stage for the calculation of `keyLookupResultRDD` . In specific I would have expected to see a stage related to [this code](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L152) as [this check](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L101) with the above configurations should pass Here the stages on the SparkUI related to this Option D: ![Screenshot 2023-10-16 at 10 47 38](https://github.com/apache/hudi/assets/5663683/2dc5cdf6-4db3-41a6-8816-bf5bba18c49a) ## Summary It seems Range Pruning is not working properly with column statistics as upsert operations end up with duplicate data. Is there an easy way to find out if the issue is related to statistics wrongly calculated or to the code not using them properly? It would be great if there was an easy way to print out for each file which column statistics are present in the metatable. These 2 other issues may be related: - https://github.com/apache/hudi/issues/9857 - https://github.com/apache/hudi/issues/9271 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org