Amrish Lal created HUDI-6891:
--------------------------------

             Summary: Read Optimized Queries should not use RLI
                 Key: HUDI-6891
                 URL: https://issues.apache.org/jira/browse/HUDI-6891
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Amrish Lal


Read optimized query on a MOR table with RLI present don't produce correct 
results as RLI lookup doesn't have the ability to distinguish between records 
that are in base table vs records that are in log files.

To reproduce the issue:
 # Create a MOR table and add three records to the table.
 # Delete one of the records (identified by uuid=5 for example) from the table 
(this delete will go into a log file).
 # Run read optimized query "select * from mytable", you will see all three 
records as base table does not have the knowledge that record with id 5 was 
deleted.
 # Run read optimized query "select * from mytable where id = 5". This query 
should return one record since the record is present in the base file. However, 
if RLI is enabled this query will get evaluated against RLI and will not return 
any records. This appears to be inconsistent with the results returned in step 
3.

spark-shell script to reproduce the issue attached below:
{code:java}
  //
  // Scala script for creating a table with RLI
  //
  import org.apache.hudi.QuickstartUtils._
  import scala.collection.JavaConversions._
  import org.apache.spark.sql.SaveMode._
  import org.apache.hudi.DataSourceReadOptions._
  import org.apache.hudi.DataSourceWriteOptions._
  import org.apache.hudi.config.HoodieWriteConfig._
  import org.apache.hudi.common.model.HoodieRecord


  val tableName = "hudi_trips_cow"
  val basePath = "file:///Users/amrish/tables/morereadoptimized"
  val dataGen = new DataGenerator


  // Generate inserts
  val inserts = convertToStringList(dataGen.generateInserts(3))
  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))


  df.write.format("hudi").
       |   options(getQuickstartWriteConfigs).
       |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
       |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
       |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
       |   option(TABLE_NAME, tableName).
       |   option("hoodie.metadata.enable", "true").
       |   option("hoodie.metadata.record.index.enable", "true").
       |   option("hoodie.enable.data.skipping", "true").
       |   option("hoodie.index.type", "RECORD_INDEX").
       |   option("hoodie.metadata.secondary.record.index.enable", "true").
       |   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
       |   option("hoodie.parquet.small.file.limit", "0").
       |   option("hoodie.compact.inline.max.delta.commits", "3").
       |   mode(Append).
       |   save(basePath)




//
// Select query
//
val readOpts = Map(
    "hoodie.metadata.enable" -> "true",
    "hoodie.metadata.record.index.enable" -> "true",
    "hoodie.enable.data.skipping" -> "true",
    "hoodie.index.type" -> "RECORD_INDEX"
)


val tripsSnapshotDF = spark.
  read.
  format("hudi").
  options(readOpts).
  load(basePath)


tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
//spark.sql("select count(*) from myrli").show(false)




//
// Delete one records (Hard Delete)
//
val dataset = spark.sql("select * from myrli where 
uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").limit(1)
val deletes = dataGen.generateDeletes(dataset.collectAsList())
val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))


deleteDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)


spark.sql("select count(*) from myrli").show(false)


spark.sql("select driver, rider, uuid from myrli where 
uuid='49d34c1a-1558-4c0d-996c-1c05851f384a'").show(truncate=false)




//
// Read optimzied query
//
val readOpts = Map(
    "hoodie.metadata.enable" -> "true",
    "hoodie.metadata.record.index.enable" -> "false",
    "hoodie.enable.data.skipping" -> "true",
    "hoodie.index.type" -> "RECORD_INDEX",
    "hoodie.datasource.query.type" -> "read_optimized",
)


val tripsSnapshotDF = spark.
  read.
  format("hudi").
  options(readOpts).
  load(basePath)


tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
spark.sql("select * from myrli WHERE 
uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").show(false)
//spark.sql("select count(*) from myrli").show(false) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to