[
https://issues.apache.org/jira/browse/HUDI-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17914512#comment-17914512
]
Davis Zhang commented on HUDI-8819:
-----------------------------------
Ok I have the RCA. Here is the full summary of the issue:
# *We don't support 0.x and 1.x to run concurrently and with MDT enabled.*
# *1.x with backward compatible reader should be able to read 0.x table with
no issue and no migration is required. However, spark by design does not
automatically sync for changes done by a different spark cluster/spark session,
it has some internal in-memory cache that only populate the state from the file
system once and keep reusing it. User needs to take explicit action to refresh
the cache so that those in-memory states will be recreated again from disk.*
In summary, no real issue. Nothing needs to be fixed.
*Example of spark SQL:*
so we are not able to see the latest update done by another spark session,
because spark by design won't actively reload the latest state of the relation
unless user take action - in SQL user needs to refresh table <table name> to
rebuild some in-memory state, otherwise things like active timeline will never
get refreshed and pick up changes done by other spark clusters.
*Example of spark data source API:*
Similarly, for data source read and write we need to do the same thing - after
other spark session has done some changes, the other session needs to call
spark read API again to create a new Dataframe object.
{code:java}
scala> // Define the Hudi options
scala> val hudiOptions = Map(
| "hoodie.table.name" -> "hudi_table_mor_single_partition22",
| "hoodie.datasource.write.recordkey.field" -> "id",
| "hoodie.datasource.write.partitionpath.field" -> "name",
| "hoodie.datasource.write.precombine.field" -> "ts",
| "hoodie.datasource.write.operation" -> "upsert",
| "hoodie.datasource.write.payload.class" ->
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
| "hoodie.table.type" -> "MERGE_ON_READ",
| "hoodie.compaction.payload.class" ->
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
| "hoodie.write.table.version" -> "6",
| "hoodie.metadata.enable" -> "false"
| )
hudiOptions: scala.collection.immutable.Map[String,String] =
Map(hoodie.write.table.version -> 6, hoodie.datasource.write.precombine.field
-> ts, hoodie.datasource.write.payload.class ->
org.apache.hudi.common.model.DefaultHoodieRecordPayload, hoodie.metadata.enable
-> false, hoodie.datasource.write.operation -> upsert,
hoodie.datasource.write.recordkey.field -> id, hoodie.table.name ->
hudi_table_mor_single_partition22, hoodie.table.type -> MERGE_ON_READ,
hoodie.datasource.write.partitionpath.field -> name,
hoodie.compaction.payload.class ->
org.apache.hudi.common.model.DefaultHoodieRecordPayload)
scala> // The read operation needs to be completed before assigning to val
scala> val df = spark.read
df: org.apache.spark.sql.DataFrameReader =
org.apache.spark.sql.DataFrameReader@143ec23b
scala> .format("hudi")
res0: org.apache.spark.sql.DataFrameReader =
org.apache.spark.sql.DataFrameReader@143ec23b
scala> .options(hudiOptions)
res1: org.apache.spark.sql.DataFrameReader =
org.apache.spark.sql.DataFrameReader@143ec23b
scala>
.load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22")
25/01/19 16:35:50 WARN DFSPropertiesConfiguration: Properties file
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
25/01/19 16:35:50 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR,
please set it as the dir of hudi-defaults.conf
res2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string,
_hoodie_commit_seqno: string ... 7 more fields]
scala>
scala> // Now you can order and show the DataFrame
scala> df.orderBy("id").show()
<console>:24: error: value orderBy is not a member of
org.apache.spark.sql.DataFrameReader
df.orderBy("id").show()
^
scala> df.show()
<console>:24: error: value show is not a member of
org.apache.spark.sql.DataFrameReader
df.show()
^
scala> val df =
spark.read.format("hudi").options(hudiOptions).load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22")
df: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string,
_hoodie_commit_seqno: string ... 7 more fields]
scala> df.show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
| 20250119115843994|20250119115843994...| 1|
name=F|28293ff5-43c5-40b...| 1| 30.0|101| F|
| 20250119115253942|20250119115253942...| 1|
name=E|07fb70ad-b248-478...| 1| 30.0|101| E|
| 20250119112546832|20250119112546832...| 3|
name=C|0e9d58fa-b5a3-4e9...| 3| 40.0|102| C|
| 20250119112407263|20250119112407263...| 1|
name=A|8b01a277-9b6e-4f5...| 1| 30.0|101| A|
| 20250119112407263|20250119112407263...| 2|
name=B|7be14215-4dcf-4cd...| 2| 40.0|101| B|
| 20250119154921471|20250119154921471...| 1|
name=G|8292ab1c-b7ad-41d...| 1| 30.0|101| G|
| 20250119115013424|20250119115013424...| 1|
name=D|25c967a6-9944-408...| 1| 30.0|101| D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
/// in another session insert a new row with name "H"
// directly run the df.show() does not help.
scala> df.show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
| 20250119115843994|20250119115843994...| 1|
name=F|28293ff5-43c5-40b...| 1| 30.0|101| F|
| 20250119115253942|20250119115253942...| 1|
name=E|07fb70ad-b248-478...| 1| 30.0|101| E|
| 20250119112546832|20250119112546832...| 3|
name=C|0e9d58fa-b5a3-4e9...| 3| 40.0|102| C|
| 20250119112407263|20250119112407263...| 1|
name=A|8b01a277-9b6e-4f5...| 1| 30.0|101| A|
| 20250119112407263|20250119112407263...| 2|
name=B|7be14215-4dcf-4cd...| 2| 40.0|101| B|
| 20250119154921471|20250119154921471...| 1|
name=G|8292ab1c-b7ad-41d...| 1| 30.0|101| G|
| 20250119115013424|20250119115013424...| 1|
name=D|25c967a6-9944-408...| 1| 30.0|101| D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
// Must call read API to construct a new plan.
scala>
spark.read.format("hudi").options(hudiOptions).load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22").show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
| 20250119115843994|20250119115843994...| 1|
name=F|28293ff5-43c5-40b...| 1| 30.0|101| F|
| 20250119115253942|20250119115253942...| 1|
name=E|07fb70ad-b248-478...| 1| 30.0|101| E|
| 20250119163823366|20250119163823366...| 1|
name=H|b5cb8a95-376a-49e...| 1| 30.0|101| H|
| 20250119112546832|20250119112546832...| 3|
name=C|0e9d58fa-b5a3-4e9...| 3| 40.0|102| C|
| 20250119112407263|20250119112407263...| 1|
name=A|8b01a277-9b6e-4f5...| 1| 30.0|101| A|
| 20250119112407263|20250119112407263...| 2|
name=B|7be14215-4dcf-4cd...| 2| 40.0|101| B|
| 20250119154921471|20250119154921471...| 1|
name=G|8292ab1c-b7ad-41d...| 1| 30.0|101| G|
| 20250119115013424|20250119115013424...| 1|
name=D|25c967a6-9944-408...| 1| 30.0|101| D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
{code}
> Hudi 1.0's backward writer's UPDATE/DELETE would corrupt older versioned Hudi
> table
> -----------------------------------------------------------------------------------
>
> Key: HUDI-8819
> URL: https://issues.apache.org/jira/browse/HUDI-8819
> Project: Apache Hudi
> Issue Type: Sub-task
> Affects Versions: 1.0.0
> Reporter: Shawn Chang
> Assignee: Davis Zhang
> Priority: Blocker
> Fix For: 1.0.1
>
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Reproduction:
> # Create a table with Hudi 0.14 + Spark 3.5.0 with some rows
> # Use Hudi 1.0.0 + Spark 3.5.3 as writer, set
> .option("hoodie.write.table.version", 6) to enable backward writer
>
> # After updating some rows, read with Hudi 1.0.0 + Spark 3.5.3:
> spark.read.format("hudi").load(tablePath)
>
> # The read results from Hudi 1.0.0 + Spark 3.5.3 would only contain updated
> rows
> # Same happens to DELETE, if we delete some rows with Hudi 1.0.0 + Spark
> 3.5.3, then the Spark reader can only see the delete blocks that contain zero
> row
> # Older versioned Hudi reader (Athena) can still see the correct results
--
This message was sent by Atlassian Jira
(v8.20.10#820010)