[ 
https://issues.apache.org/jira/browse/HUDI-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved HUDI-2146.
----------------------------------
    Resolution: Cannot Reproduce

> Concurrent writes loss data 
> ----------------------------
>
>                 Key: HUDI-2146
>                 URL: https://issues.apache.org/jira/browse/HUDI-2146
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Priority: Blocker
>             Fix For: 0.9.0
>
>         Attachments: image-2021-07-08-00-49-30-730.png
>
>
> Reproduction steps:
> Create a Hudi table:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> import org.apache.hudi.AvroConversionUtils
> val df = Seq(
>   (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
>   (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_test"
> var tablePath = "s3://.../" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> {code}
> Perform two insert operations almost in the same time, each insertion 
> contains different data:
> Insert 1:
> {code:java}
> val df3 = Seq(
>   (400, "event_name_111111", "2125-02-01T13:51:39.340396Z", "type1"),
>   (401, "event_name_222222", "2125-02-01T12:14:58.597216Z", "type2"),
>   (404, "event_name_333433", "2126-01-01T12:15:00.512679Z", "type1"),
>   (405, "event_name_666378", "2125-07-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>    .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>    .option("hoodie.write.lock.provider", 
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
>    .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
>    .option("hoodie.write.lock.zookeeper.port", "2181")
>    .option("hoodie.write.lock.zookeeper.lock_key", tableName)
>    .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> Insert 2:
> {code:java}
> val df3 = Seq(
>   (300, "event_name_11111", "2035-02-01T13:51:39.340396Z", "type1"),
>   (301, "event_name_22222", "2035-02-01T12:14:58.597216Z", "type2"),
>   (304, "event_name_33333", "2036-01-01T12:15:00.512679Z", "type1"),
>   (305, "event_name_66678", "2035-07-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
> DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
>    .option("hoodie.cleaner.policy.failed.writes", "LAZY")
>    .option("hoodie.write.lock.provider", 
> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider")
>    .option("hoodie.write.lock.zookeeper.url", "ip-***.ec2.internal")
>    .option("hoodie.write.lock.zookeeper.port", "2181")
>    .option("hoodie.write.lock.zookeeper.lock_key", tableName)
>    .option("hoodie.write.lock.zookeeper.base_path", "/occ_lock")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> There's no exception/rollback during the insertions, however, when I check 
> the Hudi table data, it only contains one insert and the other one is missing.
> Here is the timeline:
> !image-2021-07-08-00-49-30-730.png|width=840,height=322!
> I checked the parquet file at 20210706171250, it contains all the insertion 
> data. However, in the parquet file of 20210706171252, it only contains one 
> insert data. Also you can see, though 20210706171252 is the latest timestamp, 
> the commit happens before 20210706171250. 
> So I guess here is the process:
> insert 1 get timestamp (20210706171250) -> insert 2 get timestamp 
> (20210706171252) -> insert 2 get the lock & insert 1 is blocked -> commit 
> insert 2 data to Hudi table (20210706171252) -> commit insert 1 data to Hudi 
> table (20210706171250)
> However, when querying Hudi table, it would return the data from the latest 
> timestamp which is 20210706171252, so it would only return the data from 
> insert 2.
> We need to check/update the timestamp somewhere before committing.
>  
>  
>  
>  
> the 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to