[jira] [Comment Edited] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769893#comment-17769893 ] koert kuipers edited comment on SPARK-45282 at 9/28/23 4:07 AM: yes i can reproduce it. master branch on commit: {code:java} commit 7e8aafd2c0f1f6fcd03a69afe2b85fd3fda95d20 (HEAD -> master, upstream/master) Author: lanmengran1 Date: Tue Sep 26 21:01:02 2023 -0500 [SPARK-45334][SQL] Remove misleading comment in parquetSchemaConverter {code} i build spark for k8s using: {code:java} $ dev/make-distribution.sh --name kubernetes --tgz -Pkubernetes -Phadoop-cloud {code} created docker container using Dockerfile provided in resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile launch pod and shell inside: {code:java} 185@proxy:~/work-dir$ export SPARK_LOCAL_HOSTNAME=$(hostname -i 185@proxy:~/work-dir$ export SPARK_PUBLIC_DNS=$(hostname -i) 185@proxy:~/work-dir$ /opt/spark/bin/spark-shell --master k8s://https://kubernetes.default:443 --deploy-mode client --num-executors 4 --executor-memory 2G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kubernetes.namespace=default --conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=33554432 --conf spark.sql.optimizer.canChangeCachedPlanOutputPartitioning=true --conf spark.kubernetes.container.image=.dkr.ecr.us-east-1.amazonaws.com/spark:4.0.0-SNAPSHOT 23/09/28 03:44:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT /_/ Using Scala version 2.13.11 (OpenJDK 64-Bit Server VM, Java 21) Type in expressions to have them evaluated. Type :help for more information. Spark context Web UI available at http://10.177.71.94:4040 Spark context available as 'sc' (master = k8s://https://kubernetes.default:443, app id = spark-5ab0957571944828866a2f23068ff180). Spark session available as 'spark'.scala> :paste // Entering paste mode (ctrl-D to finish)import java.util.UUID import org.apache.spark.sql.functions.col import spark.implicits._ val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist() val left = data.map(k => (k, 1)) val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! println("number of left " + left.count()) println("number of right " + right.count()) println("number of (left join right) " + left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count() ) val left1 = left .toDF("key", "vertex") .repartition(col("key")) // comment out this line to make it work .persist() println("number of left1 " + left1.count()) val right1 = right .toDF("key", "state") .repartition(col("key")) // comment out this line to make it work .persist() println("number of right1 " + right1.count()) println("number of (left1 join right1) " + left1.join(right1, "key").count()) // this gives incorrect result // Exiting paste mode, now interpreting. 23/09/28 03:45:30 WARN TaskSetManager: Stage 0 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:34 WARN TaskSetManager: Stage 1 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left 100 23/09/28 03:45:36 WARN TaskSetManager: Stage 4 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right 100 23/09/28 03:45:39 WARN TaskSetManager: Stage 7 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:40 WARN TaskSetManager: Stage 8 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of (left join right) 100 23/09/28 03:45:45 WARN TaskSetManager: Stage 16 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left1 100 23/09/28 03:45:48 WARN TaskSetManager: Stage 24 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right1 100 number of (left1 join right1) 850735 import
[jira] [Comment Edited] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769893#comment-17769893 ] koert kuipers edited comment on SPARK-45282 at 9/28/23 4:06 AM: yes i can reproduce it. master branch on commit: {code:java} commit 7e8aafd2c0f1f6fcd03a69afe2b85fd3fda95d20 (HEAD -> master, upstream/master) Author: lanmengran1 Date: Tue Sep 26 21:01:02 2023 -0500 [SPARK-45334][SQL] Remove misleading comment in parquetSchemaConverter {code} i build spark for k8s using: {code:java} $ dev/make-distribution.sh --name kubernetes --tgz -Pkubernetes -Phadoop-cloud {code} created docker container using Dockerfile provided in resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile launch pod and shell inside: {code:java} 185@proxy:~/work-dir$ export SPARK_LOCAL_HOSTNAME=$(hostname -i 185@proxy:~/work-dir$ export SPARK_PUBLIC_DNS=$(hostname -i) 185@proxy:~/work-dir$ /opt/spark/bin/spark-shell --master k8s://https://kubernetes.default:443 --deploy-mode client --num-executors 4 --executor-memory 2G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kubernetes.namespace=default --conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=33554432 --conf spark.sql.optimizer.canChangeCachedPlanOutputPartitioning=true --conf spark.kubernetes.container.image=.dkr.ecr.us-east-1.amazonaws.com/spark:4.0.0-SNAPSHOT 23/09/28 03:44:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT /_/ Using Scala version 2.13.11 (OpenJDK 64-Bit Server VM, Java 21) Type in expressions to have them evaluated. Type :help for more information. Spark context Web UI available at http://10.177.71.94:4040 Spark context available as 'sc' (master = k8s://https://kubernetes.default:443, app id = spark-5ab0957571944828866a2f23068ff180). Spark session available as 'spark'.scala> :paste // Entering paste mode (ctrl-D to finish)import java.util.UUID import org.apache.spark.sql.functions.col import spark.implicits._ val data = (1 to 100).toDS().map(i => UUID.randomUUID().toString).persist() val left = data.map(k => (k, 1)) val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! println("number of left " + left.count()) println("number of right " + right.count()) println("number of (left join right) " + left.toDF("key", "vertex").join(right.toDF("key", "state"), "key").count() ) val left1 = left .toDF("key", "vertex") .repartition(col("key")) // comment out this line to make it work .persist() println("number of left1 " + left1.count()) val right1 = right .toDF("key", "state") .repartition(col("key")) // comment out this line to make it work .persist() println("number of right1 " + right1.count()) println("number of (left1 join right1) " + left1.join(right1, "key").count()) // this gives incorrect result // Exiting paste mode, now interpreting. 23/09/28 03:45:30 WARN TaskSetManager: Stage 0 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:34 WARN TaskSetManager: Stage 1 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left 100 23/09/28 03:45:36 WARN TaskSetManager: Stage 4 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right 100 23/09/28 03:45:39 WARN TaskSetManager: Stage 7 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. 23/09/28 03:45:40 WARN TaskSetManager: Stage 8 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of (left join right) 100 23/09/28 03:45:45 WARN TaskSetManager: Stage 16 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of left1 100 23/09/28 03:45:48 WARN TaskSetManager: Stage 24 contains a task of very large size (6631 KiB). The maximum recommended task size is 1000 KiB. number of right1 100 number of (left1 join right1) 850735 import
[jira] [Comment Edited] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768123#comment-17768123 ] koert kuipers edited comment on SPARK-45282 at 9/22/23 7:04 PM: after reverting SPARK-41048 the issue went away. was (Author: koert): after reverting SPARK-41048 the issue went away. so i think this is the cause. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Priority: Major > Labels: CorrectnessBug, correctness > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org