hudi-agent commented on code in PR #18276:
URL: https://github.com/apache/hudi/pull/18276#discussion_r3346347727


##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +49,374 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+Hudi's write path is mature, and involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be a non-starter. Also, due to the 
flexibility of the V1 API in terms of allowing the writes to shuffle data after 
the `df.write.format....save` is invoked, Hudi supports a streaming DF write 
for its upsert operation. A good majority of Hudi jobs work this way today, and 
we cannot break all of these at once.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, together with the schema-on-read precedence 
and DSv2 supportability gate, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, schemaEvolutionEnabled = false,
+v2ReadEnabled = true, isSupportedByDSv2 = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+HoodieSpark{33,34,35,40,41}DataSourceV2ToV1Fallback
+converts InsertIntoStatement(DataSourceV2Relation) to a V1 LogicalRelation
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog

Review Comment:
   🤖 What happens if `isSupportedByDSv2` rejects inside `newScanBuilder` after 
the SQL/catalog path already committed to `HoodieSparkV2Table` at `loadTable` 
time? The documented "falls back to V1" only fires at `loadTable`; once 
`HoodieSparkV2Table` is returned, V1 fall-back isn't trivially available. The 
RFC says SQL/catalog `loadTable` constructs with an empty options map and the 
analyzer attaches per-query options (e.g. `TIMESTAMP AS OF`) later — so the 
merged options seen by `newScanBuilder` can differ from those seen by 
`loadTable`. Could you spell out the contract: hard error vs. silent 
passthrough, and confirm no SQL flow (analyzer-attached options, session conf 
flip between loadTable and scan time) can land us in V2 with 
`query.type=incremental` / `incremental.format=cdc` and silently produce 
snapshot results? @yihua
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to