[ 
https://issues.apache.org/jira/browse/SPARK-47019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ridvan Appa Bugis updated SPARK-47019:
--------------------------------------
    Description: 
It seems like we have encountered an issue with Spark AQE's dynamic cache 
partitioning which causes incorrect *count* output values and data loss.
 
Preconditions:
 - Setup a cluster as per environment specification
 - Prepare test data (or a data large enough to trigger read by both executors)



Steps to reproduce:
 - Read parent
 - Self join parent
 - cache + materialize parent
 - Join parent with child
 


Performing a self-join over a parentDF, then caching + materialising the DF, 
and then joining it with a childDF results in *incorrect* count value and 
{*}missing data{*}.
 
Performing a *repartition* seems to fix the issue, most probably due to 
rearrangement of the underlying partitions and statistic update.
 
This behaviour is observed over a multi-worker cluster with a job running 2 
executors (1 per worker), when reading a large enough data file by both 
executors.
Not reproducible in local mode.
 
Circumvention:
So far, by disabling 
_spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing 
repartition this can be alleviated, but it is not the fix of the root cause.
 
This issue is dangerous considering that data loss is occurring silently and in 
absence of proper checks can lead to wrong behaviour/results down the line. 
This issue is not labeled *blocker* due to the possibility to patch the 
behaviour.
 
There seems to be a file-size treshold after which dataloss is observed 
(possibly implying that it happens when both executors start reading the data 
file)
 
Minimal example:
{code:java}
// Read parent
val parentData = session.read.format("avro").load("/data/shared/test/parent")

// Self join parent and cache + materialize
val parent = parentData.join(parentData, Seq("PID")).cache()
parent.count()

// Read child
val child = session.read.format("avro").load("/data/shared/test/child")

// Basic join
val resultBasic = child.join(
  parent,
  parent("PID") === child("PARENT_ID")
)
// Count: 16781 (Wrong)
println(s"Count no repartition: ${resultBasic.count()}")

// Repartition parent join
val resultRepartition = child.join(
  parent.repartition(),
  parent("PID") === child("PARENT_ID")
)
// Count: 50094 (Correct)
println(s"Count with repartition: ${resultRepartition.count()}") {code}
Spark submit for this job:
{code:java}
spark-submit 
      --class ExampleApp 
      --packages org.apache.spark:spark-avro_2.12:3.5.0 
      --deploy-mode cluster 
      --master spark://spark-master:6066 
      --conf spark.sql.autoBroadcastJoinThreshold=-1  
      --conf spark.cores.max=3 
      --driver-cores 1 
      --driver-memory 1g 
      --executor-cores 1 
      --executor-memory 1g 
      /path/to/test.jar
 {code}
The cluster should be setup to the following (worker1(m+e) worker2(e)) as to 
split the executors onto two workers.

I have prepared a simple github repository which contains the compilable above 
example.

 

  was:
It seems like we have encountered an issue with Spark AQE's dynamic cache 
partitioning which causes incorrect *count* output values and data loss.
 
Preconditions:
- Setup a cluster as per environment specification
- Prepare test data (or a data large enough to trigger read by both executors)
Steps to reproduce:
- Read parent
- Self join parent
- cache + materialize parent
- Join parent with child
 
Performing a self-join over a parentDF, then caching + materialising the DF, 
and then joining it with a childDF results in *incorrect* count value and 
{*}missing data{*}.
 
Performing a *repartition* seems to fix the issue, most probably due to 
rearrangement of the underlying partitions and statistic update.
 
This behaviour is observed over a multi-worker cluster with a job running 2 
executors (1 per worker), when reading a large enough data file by both 
executors.
Not reproducible in local mode.
 
Circumvention:
So far, by disabling 
_spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing 
repartition this can be alleviated, but it is not the fix of the root cause.
 
This issue is dangerous considering that data loss is occurring silently and in 
absence of proper checks can lead to wrong behaviour/results down the line. 
This issue is not labeled *blocker* due to the possibility to patch the 
behaviour.
 
There seems to be a file-size treshold after which dataloss is observed 
(possibly implying that it happens when both executors start reading the data 
file)
 
Minimal example:
{code:java}
// Read parent
val parentData = session.read.format("avro").load("/data/shared/test/parent")

// Self join parent and cache + materialize
val parent = parentData.join(parentData, Seq("PID")).cache()
parent.count()

// Read child
val child = session.read.format("avro").load("/data/shared/test/child")

// Basic join
val resultBasic = child.join(
  parent,
  parent("PID") === child("PARENT_ID")
)
// Count: 16781 (Wrong)
println(s"Count no repartition: ${resultBasic.count()}")

// Repartition parent join
val resultRepartition = child.join(
  parent.repartition(),
  parent("PID") === child("PARENT_ID")
)
// Count: 50094 (Correct)
println(s"Count with repartition: ${resultRepartition.count()}") {code}
Spark submit for this job:
{code:java}
spark-submit 
      --class ExampleApp 
      --packages org.apache.spark:spark-avro_2.12:3.5.0 
      --deploy-mode cluster 
      --master spark://spark-master:6066 
      --conf spark.sql.autoBroadcastJoinThreshold=-1  
      --conf spark.cores.max=3 
      --driver-cores 1 
      --driver-memory 1g 
      --executor-cores 1 
      --executor-memory 1g 
      /path/to/test.jar
 {code}
The cluster should be setup to the following (worker1(m+e) worker2(e)) as to 
split the executors onto two workers.

I have prepared a simple github repository which contains the compilable above 
example.

 


> AQE dynamic cache partitioning causes SortMergeJoin to result in data loss
> --------------------------------------------------------------------------
>
>                 Key: SPARK-47019
>                 URL: https://issues.apache.org/jira/browse/SPARK-47019
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, Spark Core
>    Affects Versions: 3.5.0
>         Environment: Reproduced on, so far:
>  * kubernetes deployment
>  * docker cluster deployment
> Local Cluster:
>  * master
>  * worker1 (2/2G)
>  * worker2 (1/1G)
>            Reporter: Ridvan Appa Bugis
>            Priority: Critical
>              Labels: DAG, caching, correctness, data-loss, 
> dynamic_allocation, inconsistency, partitioning
>
> It seems like we have encountered an issue with Spark AQE's dynamic cache 
> partitioning which causes incorrect *count* output values and data loss.
>  
> Preconditions:
>  - Setup a cluster as per environment specification
>  - Prepare test data (or a data large enough to trigger read by both 
> executors)
> Steps to reproduce:
>  - Read parent
>  - Self join parent
>  - cache + materialize parent
>  - Join parent with child
>  
> Performing a self-join over a parentDF, then caching + materialising the DF, 
> and then joining it with a childDF results in *incorrect* count value and 
> {*}missing data{*}.
>  
> Performing a *repartition* seems to fix the issue, most probably due to 
> rearrangement of the underlying partitions and statistic update.
>  
> This behaviour is observed over a multi-worker cluster with a job running 2 
> executors (1 per worker), when reading a large enough data file by both 
> executors.
> Not reproducible in local mode.
>  
> Circumvention:
> So far, by disabling 
> _spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing 
> repartition this can be alleviated, but it is not the fix of the root cause.
>  
> This issue is dangerous considering that data loss is occurring silently and 
> in absence of proper checks can lead to wrong behaviour/results down the 
> line. This issue is not labeled *blocker* due to the possibility to patch the 
> behaviour.
>  
> There seems to be a file-size treshold after which dataloss is observed 
> (possibly implying that it happens when both executors start reading the data 
> file)
>  
> Minimal example:
> {code:java}
> // Read parent
> val parentData = session.read.format("avro").load("/data/shared/test/parent")
> // Self join parent and cache + materialize
> val parent = parentData.join(parentData, Seq("PID")).cache()
> parent.count()
> // Read child
> val child = session.read.format("avro").load("/data/shared/test/child")
> // Basic join
> val resultBasic = child.join(
>   parent,
>   parent("PID") === child("PARENT_ID")
> )
> // Count: 16781 (Wrong)
> println(s"Count no repartition: ${resultBasic.count()}")
> // Repartition parent join
> val resultRepartition = child.join(
>   parent.repartition(),
>   parent("PID") === child("PARENT_ID")
> )
> // Count: 50094 (Correct)
> println(s"Count with repartition: ${resultRepartition.count()}") {code}
> Spark submit for this job:
> {code:java}
> spark-submit 
>       --class ExampleApp 
>       --packages org.apache.spark:spark-avro_2.12:3.5.0 
>       --deploy-mode cluster 
>       --master spark://spark-master:6066 
>       --conf spark.sql.autoBroadcastJoinThreshold=-1  
>       --conf spark.cores.max=3 
>       --driver-cores 1 
>       --driver-memory 1g 
>       --executor-cores 1 
>       --executor-memory 1g 
>       /path/to/test.jar
>  {code}
> The cluster should be setup to the following (worker1(m+e) worker2(e)) as to 
> split the executors onto two workers.
> I have prepared a simple github repository which contains the compilable 
> above example.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to