Vitaliy Savkin created SPARK-46198: -------------------------------------- Summary: Unexpected Shuffle Read when using cached DataFrame Key: SPARK-46198 URL: https://issues.apache.org/jira/browse/SPARK-46198 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.1 Reporter: Vitaliy Savkin Attachments: shuffle.png
When a computation is base on a cached data frames, I expect to see no Shuffle Reads, but it happens under certain circumstances. *Reproduction* {code:scala} val ctx: SQLContext = // init context val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce" def populateAndRead(tag: String): DataFrame = { val path = s"$root/numbers_$tag" // import ctx.implicits._ // import org.apache.spark.sql.functions.lit // (0 to 10 * 1000 * 1000) // .toDF("id") // .withColumn(tag, lit(tag.toUpperCase)) // .repartition(100) // .write // .option("header", "true") // .mode("ignore") // .csv(path) ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + "_id") } val dfa = populateAndRead("a1") val dfb = populateAndRead("b1") val res = dfa.join(dfb, dfa("a1_id") === dfb("b1_id")) .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1"))) .cache() println(res.count()) res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers") {code} Relevant configs {code:scala} spark.executor.instances=10 spark.executor.cores=7 spark.executor.memory=40g spark.executor.memoryOverhead=5g spark.shuffle.service.enabled=true spark.sql.adaptive.enabled=false spark.sql.autoBroadcastJoinThreshold=-1 {code} Spark Plan says that cache is used {code:scala} == Physical Plan == Execute InsertIntoHadoopFsRelationCommand (27) +- Coalesce (26) +- InMemoryTableScan (1) +- InMemoryRelation (2) +- Union (25) :- * SortMergeJoin Inner (13) : :- * Sort (7) : : +- Exchange (6) : : +- * Project (5) : : +- * Filter (4) : : +- Scan csv (3) : +- * Sort (12) : +- Exchange (11) : +- * Project (10) : +- * Filter (9) : +- Scan csv (8) +- * SortMergeJoin Inner (24) :- * Sort (18) : +- Exchange (17) : +- * Project (16) : +- * Filter (15) : +- Scan csv (14) +- * Sort (23) +- Exchange (22) +- * Project (21) +- * Filter (20) +- Scan csv (19) {code} But when running on YARN, the csv job has shuffle reads. !image-2023-12-01-09-27-39-463.png! *Additional info* - I was unable to reproduce it with local Spark. - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join conditions are changed to just {{{}"id"{}}}, the issue disappears! - This behaviour is stable - it's not a result of failed instances. *Production impact* Without cache saving data in production takes much longer (30 seconds vs 18 seconds). To avoid shuffle reads, we had to add a {{repartition}} step before {{cache}} as a workaround, which reduced time from 18 seconds to 10. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org