[ 
https://issues.apache.org/jira/browse/HUDI-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17914512#comment-17914512
 ] 

Davis Zhang commented on HUDI-8819:
-----------------------------------

Ok I have the RCA. Here is the full summary of the issue:
 # *We don't support 0.x and 1.x to run concurrently and with MDT enabled.*
 # *1.x with backward compatible reader should be able to read 0.x table with 
no issue and no migration is required. However, spark by design does not 
automatically sync for changes done by a different spark cluster/spark session, 
it has some internal in-memory cache that only populate the state from the file 
system once and keep reusing it. User needs to take explicit action to refresh 
the cache so that those in-memory states will be recreated again from disk.*

 

In summary, no real issue. Nothing needs to be fixed.

 

*Example of spark SQL:*

so we are not able to see the latest update done by another spark session, 
because spark by design won't actively reload the latest state of the relation 
unless user take action - in SQL user needs to refresh table <table name> to 
rebuild some in-memory state, otherwise things like active timeline will never 
get refreshed and pick up changes done by other spark clusters.

 

*Example of spark data source API:*

Similarly, for data source read and write we need to do the same thing - after 
other spark session has done some changes, the other session needs to call 
spark read API again to create a new Dataframe object.

 
{code:java}
scala> // Define the Hudi options

scala> val hudiOptions = Map(
     |   "hoodie.table.name" -> "hudi_table_mor_single_partition22",
     |   "hoodie.datasource.write.recordkey.field" -> "id",
     |   "hoodie.datasource.write.partitionpath.field" -> "name",
     |   "hoodie.datasource.write.precombine.field" -> "ts",
     |   "hoodie.datasource.write.operation" -> "upsert",
     |   "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
     |   "hoodie.table.type" -> "MERGE_ON_READ",
     |   "hoodie.compaction.payload.class" -> 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload",
     |   "hoodie.write.table.version" -> "6",
     |   "hoodie.metadata.enable" -> "false"
     | )
hudiOptions: scala.collection.immutable.Map[String,String] = 
Map(hoodie.write.table.version -> 6, hoodie.datasource.write.precombine.field 
-> ts, hoodie.datasource.write.payload.class -> 
org.apache.hudi.common.model.DefaultHoodieRecordPayload, hoodie.metadata.enable 
-> false, hoodie.datasource.write.operation -> upsert, 
hoodie.datasource.write.recordkey.field -> id, hoodie.table.name -> 
hudi_table_mor_single_partition22, hoodie.table.type -> MERGE_ON_READ, 
hoodie.datasource.write.partitionpath.field -> name, 
hoodie.compaction.payload.class -> 
org.apache.hudi.common.model.DefaultHoodieRecordPayload)

scala> // The read operation needs to be completed before assigning to val

scala> val df = spark.read
df: org.apache.spark.sql.DataFrameReader = 
org.apache.spark.sql.DataFrameReader@143ec23b

scala>   .format("hudi")
res0: org.apache.spark.sql.DataFrameReader = 
org.apache.spark.sql.DataFrameReader@143ec23b

scala>   .options(hudiOptions)
res1: org.apache.spark.sql.DataFrameReader = 
org.apache.spark.sql.DataFrameReader@143ec23b

scala>   
.load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22")
25/01/19 16:35:50 WARN DFSPropertiesConfiguration: Properties file 
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
25/01/19 16:35:50 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, 
please set it as the dir of hudi-defaults.conf
res2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 7 more fields]

scala> 

scala> // Now you can order and show the DataFrame

scala> df.orderBy("id").show()
<console>:24: error: value orderBy is not a member of 
org.apache.spark.sql.DataFrameReader
       df.orderBy("id").show()
          ^

scala> df.show()
<console>:24: error: value show is not a member of 
org.apache.spark.sql.DataFrameReader
       df.show()
          ^

scala> val df = 
spark.read.format("hudi").options(hudiOptions).load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22")
df: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, 
_hoodie_commit_seqno: string ... 7 more fields]

scala> df.show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|  20250119115843994|20250119115843994...|                 1|                
name=F|28293ff5-43c5-40b...|  1| 30.0|101|   F|
|  20250119115253942|20250119115253942...|                 1|                
name=E|07fb70ad-b248-478...|  1| 30.0|101|   E|
|  20250119112546832|20250119112546832...|                 3|                
name=C|0e9d58fa-b5a3-4e9...|  3| 40.0|102|   C|
|  20250119112407263|20250119112407263...|                 1|                
name=A|8b01a277-9b6e-4f5...|  1| 30.0|101|   A|
|  20250119112407263|20250119112407263...|                 2|                
name=B|7be14215-4dcf-4cd...|  2| 40.0|101|   B|
|  20250119154921471|20250119154921471...|                 1|                
name=G|8292ab1c-b7ad-41d...|  1| 30.0|101|   G|
|  20250119115013424|20250119115013424...|                 1|                
name=D|25c967a6-9944-408...|  1| 30.0|101|   D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+

/// in another session insert a new row with name "H"

// directly run the df.show() does not help.
scala> df.show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|  20250119115843994|20250119115843994...|                 1|                
name=F|28293ff5-43c5-40b...|  1| 30.0|101|   F|
|  20250119115253942|20250119115253942...|                 1|                
name=E|07fb70ad-b248-478...|  1| 30.0|101|   E|
|  20250119112546832|20250119112546832...|                 3|                
name=C|0e9d58fa-b5a3-4e9...|  3| 40.0|102|   C|
|  20250119112407263|20250119112407263...|                 1|                
name=A|8b01a277-9b6e-4f5...|  1| 30.0|101|   A|
|  20250119112407263|20250119112407263...|                 2|                
name=B|7be14215-4dcf-4cd...|  2| 40.0|101|   B|
|  20250119154921471|20250119154921471...|                 1|                
name=G|8292ab1c-b7ad-41d...|  1| 30.0|101|   G|
|  20250119115013424|20250119115013424...|                 1|                
name=D|25c967a6-9944-408...|  1| 30.0|101|   D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+

// Must call read API to construct a new plan.
scala> 
spark.read.format("hudi").options(hudiOptions).load("file:///tmp/lakes/observed-default/dd/hudi_table_mor_single_partition22").show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name| id|price| ts|name|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+
|  20250119115843994|20250119115843994...|                 1|                
name=F|28293ff5-43c5-40b...|  1| 30.0|101|   F|
|  20250119115253942|20250119115253942...|                 1|                
name=E|07fb70ad-b248-478...|  1| 30.0|101|   E|
|  20250119163823366|20250119163823366...|                 1|                
name=H|b5cb8a95-376a-49e...|  1| 30.0|101|   H|
|  20250119112546832|20250119112546832...|                 3|                
name=C|0e9d58fa-b5a3-4e9...|  3| 40.0|102|   C|
|  20250119112407263|20250119112407263...|                 1|                
name=A|8b01a277-9b6e-4f5...|  1| 30.0|101|   A|
|  20250119112407263|20250119112407263...|                 2|                
name=B|7be14215-4dcf-4cd...|  2| 40.0|101|   B|
|  20250119154921471|20250119154921471...|                 1|                
name=G|8292ab1c-b7ad-41d...|  1| 30.0|101|   G|
|  20250119115013424|20250119115013424...|                 1|                
name=D|25c967a6-9944-408...|  1| 30.0|101|   D|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-----+---+----+

 {code}
 

 

> Hudi 1.0's backward writer's UPDATE/DELETE would corrupt older versioned Hudi 
> table
> -----------------------------------------------------------------------------------
>
>                 Key: HUDI-8819
>                 URL: https://issues.apache.org/jira/browse/HUDI-8819
>             Project: Apache Hudi
>          Issue Type: Sub-task
>    Affects Versions: 1.0.0
>            Reporter: Shawn Chang
>            Assignee: Davis Zhang
>            Priority: Blocker
>             Fix For: 1.0.1
>
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Reproduction:
>  # Create a table with Hudi 0.14 + Spark 3.5.0 with some rows
>  # Use Hudi 1.0.0 + Spark 3.5.3 as writer, set 
> .option("hoodie.write.table.version", 6) to enable backward writer
>  
>  # After updating some rows, read with Hudi 1.0.0 + Spark 3.5.3: 
> spark.read.format("hudi").load(tablePath)
>  
>  # The read results from Hudi 1.0.0 + Spark 3.5.3 would only contain updated 
> rows
>  # Same happens to DELETE, if we delete some rows with Hudi 1.0.0 + Spark 
> 3.5.3, then the Spark reader can only see the delete blocks that contain zero 
> row
>  # Older versioned Hudi reader (Athena) can still see the correct results 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to