[ https://issues.apache.org/jira/browse/HUDI-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Y Ethan Guo updated HUDI-6886: ------------------------------ Remaining Estimate: 2h Original Estimate: 2h > Time travel queries should not use RLI > -------------------------------------- > > Key: HUDI-6886 > URL: https://issues.apache.org/jira/browse/HUDI-6886 > Project: Apache Hudi > Issue Type: Bug > Components: index > Reporter: Amrish Lal > Assignee: Sagar Sumit > Priority: Blocker > Fix For: 1.0.0 > > Original Estimate: 2h > Remaining Estimate: 2h > > If RLI is used to evaluate a time travel query, incorrect results will be > returned. The issue can be reproduced through the following steps: > # Create a table and add three records into the table at three different > time instants by running the script below three times with some time gap > between each run. > {code:java} > // > // Scala script for creating a table with RLI > // > import org.apache.hudi.QuickstartUtils._ > import scala.collection.JavaConversions._ > import org.apache.spark.sql.SaveMode._ > import org.apache.hudi.DataSourceReadOptions._ > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.config.HoodieWriteConfig._ > import org.apache.hudi.common.model.HoodieRecord > val tableName = "hudi_trips_cow" > val basePath = "file:///Users/amrish/tables/travel" > val dataGen = new DataGenerator > // Generate inserts > val inserts = convertToStringList(dataGen.generateInserts(3)) > val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) > df.write.format("hudi"). > | options(getQuickstartWriteConfigs). > | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > | option(TABLE_NAME, tableName). > | option("hoodie.metadata.enable", "true"). > | option("hoodie.metadata.record.index.enable", "true"). > | option("hoodie.enable.data.skipping", "true"). > | option("hoodie.index.type", "RECORD_INDEX"). > | option("hoodie.metadata.secondary.record.index.enable", "true"). > | option("hoodie.datasource.write.table.type", "COPY_ON_WRITE"). > | option("hoodie.parquet.small.file.limit", "0"). > | option("hoodie.compact.inline.max.delta.commits", "3"). > | mode(Append). > | save(basePath) {code} > # Run select query, ordered by _hoodie_commit_time asc, to see the data in > the table > {code:java} > // > // Select query > // > val readOpts = Map( > "hoodie.metadata.enable" -> "true", > "hoodie.metadata.record.index.enable" -> "true", > "hoodie.enable.data.skipping" -> "true", > "hoodie.index.type" -> "RECORD_INDEX" > ) > val tripsSnapshotDF = spark. > read. > format("hudi"). > options(readOpts). > load(basePath) > tripsSnapshotDF.createOrReplaceTempView("myrli") > spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) > {code} > # The results of the select query run above should look something like the > result shown below > {code:java} > +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+ > |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key > |_hoodie_partition_path |_hoodie_file_name > |begin_lat |begin_lon > |driver |end_lat |end_lon |fare |rider > |ts |uuid |partitionpath > | > +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+ > |20230920163539097 > |20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo > > |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet > |0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858 > |0.9671159942018241 > |34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo > | > |20230920163539097 > |20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo > > |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet > |0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 > |0.5030798142293655 |43.4923811219014 > |rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo > | > |20230920163539097 > |20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet > |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216 > |0.42520899698713666|64.27696295884016 > |rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco| > |20230920164025402 > |20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo > > |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet > |0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057 > |0.08079517477095832|87.42041526408588 > |rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo > | > |20230920164025402 > |20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo > > |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet > |0.36519521355305173|0.9888075495133515 > |driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272 > |rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo > | > |20230920164025402 > |20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet > |0.6220454661413275 |0.72024792576853 |driver-226|0.9048755755365163 > |0.727695054518325 |40.613510977307 > |rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco| > |20230920164221116 > |20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai > > |24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368 > |driver-913|0.964603455586492 |0.13957566957654388|45.40019146422721 > |rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai > | > |20230920164221116 > |20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo > > |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859 > |0.9017656600243008 |82.36411667430927 > |rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo > | > |20230920164221116 > |20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo > > |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669 > |0.6662084366450246 |driver-913|0.9065078444936647 |0.7124299678100179 > |5.336723040266267 > |rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo > | > +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+ > {code} > # Note that each time instance (_hoodie_commit_time) has three records. Now > delete one record from the middle time instance (20230920164025402). We use > uuid to identify the record to delete as shown below > {code:java} > // > // Delete one records (Hard Delete) > // > val dataset = spark.sql("select * from myrli where > uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1) > val deletes = dataGen.generateDeletes(dataset.collectAsList()) > val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) > deleteDf.write.format("hudi"). > option(OPERATION_OPT_KEY, "delete"). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > mode(Append). > save(basePath) {code} > # Run the select query from step (2) above again to confirm that the record > was deleted. > # Now run the same select query, but with time travel option as shown below > {code:java} > // > // Time travel query > // > val readOpts = Map( > "hoodie.metadata.enable" -> "true", > "hoodie.metadata.record.index.enable" -> "true", > "hoodie.enable.data.skipping" -> "true", > "hoodie.index.type" -> "RECORD_INDEX", > "as.of.instant" -> "20230920164025402" > ) > val tripsSnapshotDF = spark. > read. > format("hudi"). > options(readOpts). > load(basePath) > tripsSnapshotDF.createOrReplaceTempView("myrli") > spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) > spark.sql("select * from myrli WHERE > uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false) {code} > # Note that the first query will show result containing six records > including the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we > had deleted in step 4. This is perfectly ok since the result is generated, > not at the latest time interval, but at time interval 20230920164025402. > However, the second select query does not return any results, even though the > second select query is also running over the dataset generated at time > interval 20230920164025402. The second query returns incorrect results > because it is using latest RLI to evaluate a query at a time instant in past. > > The bug here is that we cannot use RLI to evaluate time stamp query at a past > interval becuase RLI only maintains the latest state. As fix, we should avoid > using RLI if the "as.of.instant" flag is set. -- This message was sent by Atlassian Jira (v8.20.10#820010)