[ 
https://issues.apache.org/jira/browse/HUDI-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Y Ethan Guo updated HUDI-6886:
------------------------------
    Remaining Estimate: 2h
     Original Estimate: 2h

> Time travel queries should not use RLI
> --------------------------------------
>
>                 Key: HUDI-6886
>                 URL: https://issues.apache.org/jira/browse/HUDI-6886
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: index
>            Reporter: Amrish Lal
>            Assignee: Sagar Sumit
>            Priority: Blocker
>             Fix For: 1.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> If RLI is used to evaluate a time travel query, incorrect results will be 
> returned. The issue can be reproduced through the following steps:
>  # Create a table and add three records into the table at three different 
> time instants by running the script below three times with some time gap 
> between each run.
> {code:java}
>   //
>   // Scala script for creating a table with RLI
>   //
>   import org.apache.hudi.QuickstartUtils._
>   import scala.collection.JavaConversions._
>   import org.apache.spark.sql.SaveMode._
>   import org.apache.hudi.DataSourceReadOptions._
>   import org.apache.hudi.DataSourceWriteOptions._
>   import org.apache.hudi.config.HoodieWriteConfig._
>   import org.apache.hudi.common.model.HoodieRecord
>   val tableName = "hudi_trips_cow"
>   val basePath = "file:///Users/amrish/tables/travel"
>   val dataGen = new DataGenerator
>   // Generate inserts
>   val inserts = convertToStringList(dataGen.generateInserts(3))
>   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
>   df.write.format("hudi").
>        |   options(getQuickstartWriteConfigs).
>        |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>        |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>        |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>        |   option(TABLE_NAME, tableName).
>        |   option("hoodie.metadata.enable", "true").
>        |   option("hoodie.metadata.record.index.enable", "true").
>        |   option("hoodie.enable.data.skipping", "true").
>        |   option("hoodie.index.type", "RECORD_INDEX").
>        |   option("hoodie.metadata.secondary.record.index.enable", "true").
>        |   option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
>        |   option("hoodie.parquet.small.file.limit", "0").
>        |   option("hoodie.compact.inline.max.delta.commits", "3").
>        |   mode(Append).
>        |   save(basePath) {code}
>  # Run select query, ordered by _hoodie_commit_time asc, to see the data in 
> the table
> {code:java}
> //
> // Select query
> //
> val readOpts = Map(
>     "hoodie.metadata.enable" -> "true",
>     "hoodie.metadata.record.index.enable" -> "true",
>     "hoodie.enable.data.skipping" -> "true",
>     "hoodie.index.type" -> "RECORD_INDEX"
> )
> val tripsSnapshotDF = spark.
>   read.
>   format("hudi").
>   options(readOpts).
>   load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) 
> {code}
>  # The results of the select query run above should look something like the 
> result shown below
> {code:java}
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                 
>  |_hoodie_partition_path              |_hoodie_file_name                      
>                                    |begin_lat          |begin_lon          
> |driver    |end_lat             |end_lon            |fare              |rider 
>    |ts           |uuid                                |partitionpath          
>              |
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
> |20230920163539097  
> |20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
>            
> |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet 
> |0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858   
> |0.9671159942018241 
> |34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
>            |
> |20230920163539097  
> |20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
>            
> |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet 
> |0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602  
> |0.5030798142293655 |43.4923811219014  
> |rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
>            |
> |20230920163539097  
> |20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet
>  |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216 
> |0.42520899698713666|64.27696295884016 
> |rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|
> |20230920164025402  
> |20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
>            
> |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet 
> |0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057 
> |0.08079517477095832|87.42041526408588 
> |rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
>            |
> |20230920164025402  
> |20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
>            
> |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet 
> |0.36519521355305173|0.9888075495133515 
> |driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272 
> |rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
>            |
> |20230920164025402  
> |20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet
>  |0.6220454661413275 |0.72024792576853   |driver-226|0.9048755755365163  
> |0.727695054518325  |40.613510977307   
> |rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|
> |20230920164221116  
> |20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
>                   
> |24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368
>  |driver-913|0.964603455586492   |0.13957566957654388|45.40019146422721 
> |rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
>                   |
> |20230920164221116  
> |20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
>            
> |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859
>    |0.9017656600243008 |82.36411667430927 
> |rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
>            |
> |20230920164221116  
> |20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
>            
> |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669
>  |0.6662084366450246 |driver-913|0.9065078444936647  |0.7124299678100179 
> |5.336723040266267 
> |rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
>            |
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
>  {code}
>  # Note that each time instance (_hoodie_commit_time) has three records. Now 
> delete one record from the middle time instance (20230920164025402). We use 
> uuid to identify the record to delete as shown below
> {code:java}
>   //
>   // Delete one records (Hard Delete)
>   //
>   val dataset = spark.sql("select * from myrli where 
> uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1)
>   val deletes = dataGen.generateDeletes(dataset.collectAsList())
>   val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
>   deleteDf.write.format("hudi").
>     option(OPERATION_OPT_KEY, "delete").
>     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>     option(TABLE_NAME, tableName).
>     mode(Append).
>     save(basePath) {code}
>  # Run the select query from step (2) above again to confirm that the record 
> was deleted.
>  # Now run the same select query, but with time travel option as shown below
> {code:java}
> //
> // Time travel query
> //
> val readOpts = Map(
>     "hoodie.metadata.enable" -> "true",
>     "hoodie.metadata.record.index.enable" -> "true",
>     "hoodie.enable.data.skipping" -> "true",
>     "hoodie.index.type" -> "RECORD_INDEX",
>     "as.of.instant" -> "20230920164025402"
> )
> val tripsSnapshotDF = spark.
>   read.
>   format("hudi").
>   options(readOpts).
>   load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> spark.sql("select * from myrli WHERE 
> uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false) {code}
>  # Note that the first query will show result containing six records 
> including the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we 
> had deleted in step 4. This is perfectly ok since the result is generated, 
> not at the latest time interval, but at time interval 20230920164025402. 
> However, the second select query does not return any results, even though the 
> second select query is also running over the dataset generated at time 
> interval 20230920164025402. The second query returns incorrect results 
> because it is using latest RLI to evaluate a query at a time instant in past.
>  
> The bug here is that we cannot use RLI to evaluate time stamp query at a past 
> interval becuase RLI only maintains the latest state. As fix, we should avoid 
> using RLI if the "as.of.instant" flag is set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to