Ridvan Appa Bugis created SPARK-47019:
-----------------------------------------

             Summary: AQE dynamic cache partitioning causes SortMergeJoin to 
result in data loss
                 Key: SPARK-47019
                 URL: https://issues.apache.org/jira/browse/SPARK-47019
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, Spark Core
    Affects Versions: 3.5.0
         Environment: Reproduced on, so far:
 * kubernetes deployment
 * docker cluster deployment

Local Cluster:
 * master
 * worker1 (2/2G)
 * worker2 (1/1G)
            Reporter: Ridvan Appa Bugis


It seems like we have encountered an issue with Spark AQE's dynamic cache 
partitioning which causes incorrect *count* output values and data loss.
 
Preconditions:
- Setup a cluster as per environment specification
- Prepare test data (or a data large enough to trigger read by both executors)
Steps to reproduce:
- Read parent
- Self join parent
- cache + materialize parent
- Join parent with child
 
Performing a self-join over a parentDF, then caching + materialising the DF, 
and then joining it with a childDF results in *incorrect* count value and 
{*}missing data{*}.
 
Performing a *repartition* seems to fix the issue, most probably due to 
rearrangement of the underlying partitions and statistic update.
 
This behaviour is observed over a multi-worker cluster with a job running 2 
executors (1 per worker), when reading a large enough data file by both 
executors.
Not reproducible in local mode.
 
Circumvention:
So far, by disabling 
_spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing 
repartition this can be alleviated, but it is not the fix of the root cause.
 
This issue is dangerous considering that data loss is occurring silently and in 
absence of proper checks can lead to wrong behaviour/results down the line. 
This issue is not labeled *blocker* due to the possibility to patch the 
behaviour.
 
There seems to be a file-size treshold after which dataloss is observed 
(possibly implying that it happens when both executors start reading the data 
file)
 
Minimal example:
{code:java}
// Read parent
val parentData = session.read.format("avro").load("/data/shared/test/parent")

// Self join parent and cache + materialize
val parent = parentData.join(parentData, Seq("PID")).cache()
parent.count()

// Read child
val child = session.read.format("avro").load("/data/shared/test/child")

// Basic join
val resultBasic = child.join(
  parent,
  parent("PID") === child("PARENT_ID")
)
// Count: 16781 (Wrong)
println(s"Count no repartition: ${resultBasic.count()}")

// Repartition parent join
val resultRepartition = child.join(
  parent.repartition(),
  parent("PID") === child("PARENT_ID")
)
// Count: 50094 (Correct)
println(s"Count with repartition: ${resultRepartition.count()}") {code}
Spark submit for this job:
{code:java}
spark-submit 
      --class ExampleApp 
      --packages org.apache.spark:spark-avro_2.12:3.5.0 
      --deploy-mode cluster 
      --master spark://spark-master:6066 
      --conf spark.sql.autoBroadcastJoinThreshold=-1  
      --conf spark.cores.max=3 
      --driver-cores 1 
      --driver-memory 1g 
      --executor-cores 1 
      --executor-memory 1g 
      /path/to/test.jar
 {code}
The cluster should be setup to the following (worker1(m+e) worker2(e)) as to 
split the executors onto two workers.

I have prepared a simple github repository which contains the compilable above 
example.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to