[
https://issues.apache.org/jira/browse/SPARK-56683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18080761#comment-18080761
]
Anupam Yadav commented on SPARK-56683:
--------------------------------------
I'd be interested in working on a fix for this. A few questions on approach:
The simplest path I see is adding source materialization for DSv2 MERGE,
similar to what Delta does with `MergeIntoMaterializeSource`.
This could be scoped as:
1. Detect when the source plan is non-deterministic (or always materialize as
a safe default)
2. Insert a materialization step before `RewriteMergeIntoTable` splits the
source into two references
Alternatively, the more general solution in SPARK-56685 (guaranteed CTE reuse)
would fix this at the Catalyst level for all cases, but that's a
larger change.
Would a targeted DSv2 materialization approach be acceptable, or is the
preference to solve this at the Catalyst level? Happy to start with a
design doc or prototype PR if there's interest.
> MERGE INTO TABLE reads the source twice and the two reads can disagree
> leading to data inconsistency
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-56683
> URL: https://issues.apache.org/jira/browse/SPARK-56683
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Juliusz Sompolski
> Priority: Major
>
> RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that
> references the source query in two positions: once as the streamed input to
> the join that pairs source rows with target rows, and once inside a subquery
> that the rewrite uses to identify which rows or groups have matching source
> rows.
> The two positions are independent reads of the same source. When the source
> is non-deterministic — for example, a table with concurrent writers, a
> streaming source, or a query containing expressions like rand() — the two
> reads can observe different sets of rows. The MERGE result is then computed
> against an inconsistent picture of the source: rows can be filtered in or out
> by the subquery while the join sees a different set of rows, producing
> dropped, duplicated, or wrongly-matched rows.
> The two reads of the source need to be made consistent so that both positions
> in the rewritten plan see the same source data, regardless of source
> determinism.
> Delta Lake resolved this with
> [https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
> in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation
> datasources suffer a possible data inconsistency issue because of it.
> This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]