[ https://issues.apache.org/jira/browse/HUDI-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Chandar resolved HUDI-2146. ---------------------------------- Resolution: Cannot Reproduce > Concurrent writes loss data > ---------------------------- > > Key: HUDI-2146 > URL: https://issues.apache.org/jira/browse/HUDI-2146 > Project: Apache Hudi > Issue Type: Bug > Reporter: Wenning Ding > Priority: Blocker > Fix For: 0.9.0 > > Attachments: image-2021-07-08-00-49-30-730.png > > > Reproduction steps: > Create a Hudi table: > {code:java} > import org.apache.hudi.DataSourceWriteOptions > import org.apache.hudi.config.HoodieWriteConfig > import org.apache.spark.sql.SaveMode > import org.apache.hudi.AvroConversionUtils > val df = Seq( > (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"), > (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"), > (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"), > (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2") > ).toDF("event_id", "event_name", "event_ts", "event_type") > var tableName = "hudi_test" > var tablePath = "s3://.../" + tableName > // write hudi dataset > df.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .mode(SaveMode.Overwrite) > .save(tablePath) > {code} > Perform two insert operations almost in the same time, each insertion > contains different data: > Insert 1: > {code:java} > val df3 = Seq( > (400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"), > (401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"), > (404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"), > (405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2") > ).toDF("event_id", "event_name", "event_ts", "event_type") > // update hudi dataset > df3.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") > .option("hoodie.cleaner.policy.failed.writes", "LAZY") > .option("hoodie.write.lock.provider", > "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider") > .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal") > .option("hoodie.write.lock.zookeeper.port", "2181") > .option("hoodie.write.lock.zookeeper.lock_key", tableName) > .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock") > .mode(SaveMode.Append) > .save(tablePath) > {code} > Insert 2: > {code:java} > val df3 = Seq( > (300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"), > (301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"), > (304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"), > (305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2") > ).toDF("event_id", "event_name", "event_ts", "event_type") > // update hudi dataset > df3.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control") > .option("hoodie.cleaner.policy.failed.writes", "LAZY") > .option("hoodie.write.lock.provider", > "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider") > .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal") > .option("hoodie.write.lock.zookeeper.port", "2181") > .option("hoodie.write.lock.zookeeper.lock_key", tableName) > .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock") > .mode(SaveMode.Append) > .save(tablePath) > {code} > There's no exception/rollback during the insertions, however, when I check > the Hudi table data, it only contains one insert and the other one is missing. > Here is the timeline: > !image-2021-07-08-00-49-30-730.png|width=840,height=322! > I checked the parquet file at 20210706171250, it contains all the insertion > data. However, in the parquet file of 20210706171252, it only contains one > insert data. Also you can see, though 20210706171252 is the latest timestamp, > the commit happens before 20210706171250. > So I guess here is the process: > insert 1 get timestamp (20210706171250) -> insert 2 get timestamp > (20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit > insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi > table (20210706171250) > However, when querying Hudi table, it would return the data from the latest > timestamp which is 20210706171252, so it would only return the data from > insert 2. > We need to check/update the timestamp somewhere before committing. > > > > > the -- This message was sent by Atlassian Jira (v8.3.4#803005)