[ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769893#comment-17769893
 ] 

koert kuipers commented on SPARK-45282:
---------------------------------------

yes i can reproduce it.

master branch on commit:

 
{code:java}
commit 7e8aafd2c0f1f6fcd03a69afe2b85fd3fda95d20 (HEAD -> master, 
upstream/master)
Author: lanmengran1 <lanmengr...@jd.com>
Date:   Tue Sep 26 21:01:02 2023 -0500    [SPARK-45334][SQL] Remove misleading 
comment in parquetSchemaConverter {code}
i build spark for k8s using:

 

 
{code:java}
$ dev/make-distribution.sh --name kubernetes --tgz -Pkubernetes -Phadoop-cloud 
{code}
created docker container using Dockerfile provided in 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile

 

launch pod and shell inside:

 
{code:java}
185@proxy:~/work-dir$ export SPARK_LOCAL_HOSTNAME=$(hostname -i
185@proxy:~/work-dir$ export SPARK_PUBLIC_DNS=$(hostname -i)                    
                                                                          
185@proxy:~/work-dir$ /opt/spark/bin/spark-shell --master 
k8s://https://kubernetes.default:443 --deploy-mode client --num-executors 4 
--executor-memory 2G --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.kubernetes.namespace=default --conf 
spark.sql.adaptive.coalescePartitions.parallelismFirst=false --conf 
spark.sql.adaptive.enabled=true --conf 
spark.sql.adaptive.advisoryPartitionSizeInBytes=33554432 --conf 
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning=true --conf 
spark.kubernetes.container.image=111111111111.dkr.ecr.us-east-1.amazonaws.com/spark:4.0.0-SNAPSHOT
23/09/28 03:44:57 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.0-SNAPSHOT
      /_/
         
Using Scala version 2.13.11 (OpenJDK 64-Bit Server VM, Java 21)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://10.177.71.94:4040
Spark context available as 'sc' (master = k8s://https://kubernetes.default:443, 
app id = spark-5ab0957571944828866a2f23068ff180).
Spark session available as 'spark'.scala> :paste
// Entering paste mode (ctrl-D to finish)import java.util.UUID
import org.apache.spark.sql.functions.col
import spark.implicits._

val data = (1 to 1000000).toDS().map(i => UUID.randomUUID().toString).persist()
val left = data.map(k => (k, 1))
val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
println("number of left " + left.count())
println("number of right " + right.count())
println("number of (left join right) " +
  left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count()
)

val left1 = left
  .toDF("key", "vertex")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of left1 " + left1.count())
val right1 = right
  .toDF("key", "state")
  .repartition(col("key")) // comment out this line to make it work
  .persist()
println("number of right1 " + right1.count())
println("number of (left1 join right1) " +  left1.join(right1, "key").count()) 
// this gives incorrect result
// Exiting paste mode, now interpreting.
23/09/28 03:45:30 WARN TaskSetManager: Stage 0 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
23/09/28 03:45:34 WARN TaskSetManager: Stage 1 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
number of left 1000000                                                          
23/09/28 03:45:36 WARN TaskSetManager: Stage 4 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
number of right 1000000
23/09/28 03:45:39 WARN TaskSetManager: Stage 7 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
23/09/28 03:45:40 WARN TaskSetManager: Stage 8 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
number of (left join right) 1000000                                             
23/09/28 03:45:45 WARN TaskSetManager: Stage 16 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
number of left1 1000000                                                         
23/09/28 03:45:48 WARN TaskSetManager: Stage 24 contains a task of very large 
size (6631 KiB). The maximum recommended task size is 1000 KiB.
number of right1 1000000                                                        
number of (left1 join right1) 850735                                            
import java.util.UUID
import org.apache.spark.sql.functions.col
import spark.implicits._
val data: org.apache.spark.sql.Dataset[String] = [value: string]
val left: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
val right: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: 
string]
val left1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: 
string, vertex: int]
val right1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: 
string, state: string]

scala>   {code}
 

 

> Join loses records for cached datasets
> --------------------------------------
>
>                 Key: SPARK-45282
>                 URL: https://issues.apache.org/jira/browse/SPARK-45282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1, 3.5.0
>         Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>            Reporter: koert kuipers
>            Priority: Major
>              Labels: CorrectnessBug, correctness
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 1000000).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 1000000
> number of right 1000000
> number of (left join right) 1000000
> number of left1 1000000
> number of right1 1000000
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to