[
https://issues.apache.org/jira/browse/SPARK-47019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ridvan Appa Bugis updated SPARK-47019:
--------------------------------------
Priority: Blocker (was: Critical)
> AQE dynamic cache partitioning causes SortMergeJoin to result in data loss
> --------------------------------------------------------------------------
>
> Key: SPARK-47019
> URL: https://issues.apache.org/jira/browse/SPARK-47019
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, Spark Core
> Affects Versions: 3.5.0
> Environment: Reproduced on, so far:
> * kubernetes deployment
> * docker cluster deployment
> Local Cluster:
> * master
> * worker1 (2/2G)
> * worker2 (1/1G)
> Reporter: Ridvan Appa Bugis
> Priority: Blocker
> Labels: DAG, caching, correctness, data-loss,
> dynamic_allocation, inconsistency, partitioning
> Attachments: Screenshot 2024-02-07 at 20.09.44.png, Screenshot
> 2024-02-07 at 20.10.07.png, testdata.zip
>
>
> It seems like we have encountered an issue with Spark AQE's dynamic cache
> partitioning which causes incorrect *count* output values and data loss.
> A similar issue could not be found, so i am creating this ticket to raise
> awareness.
>
> Preconditions:
> - Setup a cluster as per environment specification
> - Prepare test data (or a data large enough to trigger read by both
> executors)
> Steps to reproduce:
> - Read parent
> - Self join parent
> - cache + materialize parent
> - Join parent with child
>
> Performing a self-join over a parentDF, then caching + materialising the DF,
> and then joining it with a childDF results in *incorrect* count value and
> {*}missing data{*}.
>
> Performing a *repartition* seems to fix the issue, most probably due to
> rearrangement of the underlying partitions and statistic update.
>
> This behaviour is observed over a multi-worker cluster with a job running 2
> executors (1 per worker), when reading a large enough data file by both
> executors.
> Not reproducible in local mode.
>
> Circumvention:
> So far, by disabling
> _spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing
> repartition this can be alleviated, but it is not the fix of the root cause.
>
> This issue is dangerous considering that data loss is occurring silently and
> in absence of proper checks can lead to wrong behaviour/results down the
> line. This issue is not labeled *blocker* due to the possibility to patch the
> behaviour.
>
> There seems to be a file-size treshold after which dataloss is observed
> (possibly implying that it happens when both executors start reading the data
> file)
>
> Minimal example:
> {code:java}
> // Read parent
> val parentData = session.read.format("avro").load("/data/shared/test/parent")
> // Self join parent and cache + materialize
> val parent = parentData.join(parentData, Seq("PID")).cache()
> parent.count()
> // Read child
> val child = session.read.format("avro").load("/data/shared/test/child")
> // Basic join
> val resultBasic = child.join(
> parent,
> parent("PID") === child("PARENT_ID")
> )
> // Count: 16479 (Wrong)
> println(s"Count no repartition: ${resultBasic.count()}")
> // Repartition parent join
> val resultRepartition = child.join(
> parent.repartition(),
> parent("PID") === child("PARENT_ID")
> )
> // Count: 50094 (Correct)
> println(s"Count with repartition: ${resultRepartition.count()}") {code}
>
> Invalid count-only DAG:
> !Screenshot 2024-02-07 at 20.10.07.png|width=519,height=853!
> Valid repartition DAG:
> !Screenshot 2024-02-07 at 20.09.44.png|width=368,height=1219!
>
> Spark submit for this job:
> {code:java}
> spark-submit
> --class ExampleApp
> --packages org.apache.spark:spark-avro_2.12:3.5.0
> --deploy-mode cluster
> --master spark://spark-master:6066
> --conf spark.sql.autoBroadcastJoinThreshold=-1
> --conf spark.cores.max=3
> --driver-cores 1
> --driver-memory 1g
> --executor-cores 1
> --executor-memory 1g
> /path/to/test.jar
> {code}
> The cluster should be setup to the following (worker1(m+e) worker2(e)) as to
> split the executors onto two workers.
> I have prepared a simple github repository which contains the compilable
> above example.
> [https://github.com/ridvanappabugis/spark-3.5-issue]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]