Amrish Lal created HUDI-6891: -------------------------------- Summary: Read Optimized Queries should not use RLI Key: HUDI-6891 URL: https://issues.apache.org/jira/browse/HUDI-6891 Project: Apache Hudi Issue Type: Bug Reporter: Amrish Lal
Read optimized query on a MOR table with RLI present don't produce correct results as RLI lookup doesn't have the ability to distinguish between records that are in base table vs records that are in log files. To reproduce the issue: # Create a MOR table and add three records to the table. # Delete one of the records (identified by uuid=5 for example) from the table (this delete will go into a log file). # Run read optimized query "select * from mytable", you will see all three records as base table does not have the knowledge that record with id 5 was deleted. # Run read optimized query "select * from mytable where id = 5". This query should return one record since the record is present in the base file. However, if RLI is enabled this query will get evaluated against RLI and will not return any records. This appears to be inconsistent with the results returned in step 3. spark-shell script to reproduce the issue attached below: {code:java} // // Scala script for creating a table with RLI // import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord val tableName = "hudi_trips_cow" val basePath = "file:///Users/amrish/tables/morereadoptimized" val dataGen = new DataGenerator // Generate inserts val inserts = convertToStringList(dataGen.generateInserts(3)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | option("hoodie.metadata.enable", "true"). | option("hoodie.metadata.record.index.enable", "true"). | option("hoodie.enable.data.skipping", "true"). | option("hoodie.index.type", "RECORD_INDEX"). | option("hoodie.metadata.secondary.record.index.enable", "true"). | option("hoodie.datasource.write.table.type", "MERGE_ON_READ"). | option("hoodie.parquet.small.file.limit", "0"). | option("hoodie.compact.inline.max.delta.commits", "3"). | mode(Append). | save(basePath) // // Select query // val readOpts = Map( "hoodie.metadata.enable" -> "true", "hoodie.metadata.record.index.enable" -> "true", "hoodie.enable.data.skipping" -> "true", "hoodie.index.type" -> "RECORD_INDEX" ) val tripsSnapshotDF = spark. read. format("hudi"). options(readOpts). load(basePath) tripsSnapshotDF.createOrReplaceTempView("myrli") spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) //spark.sql("select count(*) from myrli").show(false) // // Delete one records (Hard Delete) // val dataset = spark.sql("select * from myrli where uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").limit(1) val deletes = dataGen.generateDeletes(dataset.collectAsList()) val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) deleteDf.write.format("hudi"). option(OPERATION_OPT_KEY, "delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) spark.sql("select count(*) from myrli").show(false) spark.sql("select driver, rider, uuid from myrli where uuid='49d34c1a-1558-4c0d-996c-1c05851f384a'").show(truncate=false) // // Read optimzied query // val readOpts = Map( "hoodie.metadata.enable" -> "true", "hoodie.metadata.record.index.enable" -> "false", "hoodie.enable.data.skipping" -> "true", "hoodie.index.type" -> "RECORD_INDEX", "hoodie.datasource.query.type" -> "read_optimized", ) val tripsSnapshotDF = spark. read. format("hudi"). options(readOpts). load(basePath) tripsSnapshotDF.createOrReplaceTempView("myrli") spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) spark.sql("select * from myrli WHERE uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").show(false) //spark.sql("select count(*) from myrli").show(false) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)