[
https://issues.apache.org/jira/browse/SPARK-56683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Juliusz Sompolski updated SPARK-56683:
--------------------------------------
Description:
RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that
references the source query in two positions: once as the streamed input to the
join that pairs source rows with target rows, and once inside a subquery that
the rewrite uses to identify which rows or groups have matching source rows.
The two positions are independent reads of the same source. When the source is
non-deterministic — for example, a table with concurrent writers, a streaming
source, or a query containing expressions like rand() — the two reads can
observe different sets of rows. The MERGE result is then computed against an
inconsistent picture of the source: rows can be filtered in or out by the
subquery while the join sees a different set of rows, producing dropped,
duplicated, or wrongly-matched rows.
The two reads of the source need to be made consistent so that both positions
in the rewritten plan see the same source data, regardless of source
determinism.
Delta Lake resolved this with
[https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation
datasources suffer a possible data inconsistency issue because of it.
This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685
was:
RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that
references the source query in two positions: once as the streamed input to the
join that pairs source rows with target rows, and once inside a subquery that
the rewrite uses to identify which rows or groups have matching source rows.
The two positions are independent reads of the same source. When the source is
non-deterministic — for example, a table with concurrent writers, a streaming
source, or a query containing expressions like rand() — the two reads can
observe different sets of rows. The MERGE result is then computed against an
inconsistent picture of the source: rows can be filtered in or out by the
subquery while the join sees a different set of rows, producing dropped,
duplicated, or wrongly-matched rows.
The two reads of the source need to be made consistent so that both positions
in the rewritten plan see the same source data, regardless of source
determinism.
> MERGE INTO TABLE reads the source twice and the two reads can disagree
> leading to data inconsistency
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-56683
> URL: https://issues.apache.org/jira/browse/SPARK-56683
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Juliusz Sompolski
> Priority: Major
>
> RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that
> references the source query in two positions: once as the streamed input to
> the join that pairs source rows with target rows, and once inside a subquery
> that the rewrite uses to identify which rows or groups have matching source
> rows.
> The two positions are independent reads of the same source. When the source
> is non-deterministic — for example, a table with concurrent writers, a
> streaming source, or a query containing expressions like rand() — the two
> reads can observe different sets of rows. The MERGE result is then computed
> against an inconsistent picture of the source: rows can be filtered in or out
> by the subquery while the join sees a different set of rows, producing
> dropped, duplicated, or wrongly-matched rows.
> The two reads of the source need to be made consistent so that both positions
> in the rewritten plan see the same source data, regardless of source
> determinism.
> Delta Lake resolved this with
> [https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
> in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation
> datasources suffer a possible data inconsistency issue because of it.
> This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]