[ 
https://issues.apache.org/jira/browse/SPARK-56683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18080761#comment-18080761
 ] 

Anupam Yadav commented on SPARK-56683:
--------------------------------------

I'd be interested in working on a fix for this. A few questions on approach:

The simplest path I see is adding source materialization for DSv2 MERGE, 
similar to what Delta does with `MergeIntoMaterializeSource`. 

This could be scoped as:
  1. Detect when the source plan is non-deterministic (or always materialize as 
a safe default)
  2. Insert a materialization step before `RewriteMergeIntoTable` splits the 
source into two references

Alternatively, the more general solution in SPARK-56685 (guaranteed CTE reuse) 
would fix this at the Catalyst level for all cases, but that's a
larger change.

Would a targeted DSv2 materialization approach be acceptable, or is the 
preference to solve this at the Catalyst level? Happy to start with a
design doc or prototype PR if there's interest.

> MERGE INTO TABLE reads the source twice and the two reads can disagree 
> leading to data inconsistency
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56683
>                 URL: https://issues.apache.org/jira/browse/SPARK-56683
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Juliusz Sompolski
>            Priority: Major
>
> RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that 
> references the source query in two positions: once as the streamed input to 
> the join that pairs source rows with target rows, and once inside a subquery 
> that the rewrite uses to identify which rows or groups have matching source 
> rows.
> The two positions are independent reads of the same source. When the source 
> is non-deterministic — for example, a table with concurrent writers, a 
> streaming source, or a query containing expressions like rand() — the two 
> reads can observe different sets of rows. The MERGE result is then computed 
> against an inconsistent picture of the source: rows can be filtered in or out 
> by the subquery while the join sees a different set of rows, producing 
> dropped, duplicated, or wrongly-matched rows.
> The two reads of the source need to be made consistent so that both positions 
> in the rewritten plan see the same source data, regardless of source 
> determinism.
> Delta Lake resolved this with 
> [https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
>  in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation 
> datasources suffer a possible data inconsistency issue because of it.
> This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to