[ https://issues.apache.org/jira/browse/HUDI-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bhavani Sudha updated HUDI-305: ------------------------------- Status: Closed (was: Patch Available) > Presto MOR "_rt" queries only reads base parquet file > ------------------------------------------------------ > > Key: HUDI-305 > URL: https://issues.apache.org/jira/browse/HUDI-305 > Project: Apache Hudi > Issue Type: Bug > Components: Presto Integration > Environment: On AWS EMR > Reporter: Brandon Scheller > Assignee: Bhavani Sudha > Priority: Blocker > Labels: pull-request-available > Fix For: 0.6.0 > > > Code example to reproduce. > {code:java} > import org.apache.hudi.DataSourceWriteOptions > import org.apache.hudi.config.HoodieWriteConfig > import org.apache.spark.sql.SaveMode > val df = Seq( > ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"), > ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"), > ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"), > ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2") > ).toDF("event_id", "event_name", "event_ts", "event_type") > var tableName = "hudi_events_mor_1" > var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName > // write hudi dataset > df.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .mode(SaveMode.Overwrite) > .save(tablePath) > // update a record with event_name "event_name_123" => "event_name_changed" > val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*") > val df2 = df1.filter($"event_id" === "104") > val df3 = df2.withColumn("event_name", lit("event_name_changed")) > // update hudi dataset > df3.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .option("hoodie.compact.inline", "false") > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false") > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .mode(SaveMode.Append) > .save(tablePath) > {code} > Now when querying the real-time table from Hive, we have no issue seeing the > updated value: > {code:java} > hive> select event_name from hudi_events_mor_1_rt; > OK > event_name_900 > event_name_changed > event_name_546 > event_name_678 > Time taken: 0.103 seconds, Fetched: 4 row(s) > {code} > But when querying the real-time table from Presto, we only read the base > parquet file and do not see the update that should be merged in from the log > file. > {code:java} > presto:default> select event_name from hudi_events_mor_1_rt; > event_name > ---------------- > event_name_900 > event_name_123 > event_name_546 > event_name_678 > (4 rows) > {code} > Our current understanding of this issue is that while the > HoodieParquetRealtimeInputFormat correctly generates the splits. The > RealtimeCompactedRecordReader record reader is not used so it is not reading > the log file and only reading the base parquet file. > -- This message was sent by Atlassian Jira (v8.3.4#803005)