[
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-45282.
---------------------------------
> Join loses records for cached datasets
> --------------------------------------
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or
> databricks 13.3
> Reporter: koert kuipers
> Assignee: Emil Ejbyfeldt
> Priority: Blocker
> Labels: CorrectnessBug, correctness, pull-request-available
> Fix For: 3.4.2
>
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test.
> however i did get it to show up on hadoop cluster, kubernetes, and on
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined.
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an
> optimization while in spark 3.3.1 these Exhanges are still present. it seems
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 1000000).toDS().map(i =>
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
> left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
> .toDF("key", "value1")
> .repartition(col("key")) // comment out this line to make it work
> .persist()
> println("number of left1 " + left1.count())
> val right1 = right
> .toDF("key", "value2")
> .repartition(col("key")) // comment out this line to make it work
> .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " + left1.join(right1,
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 1000000
> number of right 1000000
> number of (left join right) 1000000
> number of left1 1000000
> number of right1 1000000
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on
> settings and cluster size etc.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]