[ https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vitaliy Savkin updated SPARK-46198: ----------------------------------- Attachment: shuffle.png > Unexpected Shuffle Read when using cached DataFrame > --------------------------------------------------- > > Key: SPARK-46198 > URL: https://issues.apache.org/jira/browse/SPARK-46198 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.2.1 > Reporter: Vitaliy Savkin > Priority: Major > Attachments: shuffle.png > > > When a computation is base on a cached data frames, I expect to see no > Shuffle Reads, but it happens under certain circumstances. > *Reproduction* > {code:scala} > val ctx: SQLContext = // init context > val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce" > def populateAndRead(tag: String): DataFrame = { > val path = s"$root/numbers_$tag" > // import ctx.implicits._ > // import org.apache.spark.sql.functions.lit > // (0 to 10 * 1000 * 1000) > // .toDF("id") > // .withColumn(tag, lit(tag.toUpperCase)) > // .repartition(100) > // .write > // .option("header", "true") > // .mode("ignore") > // .csv(path) > ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag > + "_id") > } > val dfa = populateAndRead("a1") > val dfb = populateAndRead("b1") > val res = > dfa.join(dfb, dfa("a1_id") === dfb("b1_id")) > .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1"))) > .cache() > println(res.count()) > res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers") > {code} > Relevant configs > {code:scala} > spark.executor.instances=10 > spark.executor.cores=7 > spark.executor.memory=40g > spark.executor.memoryOverhead=5g > spark.shuffle.service.enabled=true > spark.sql.adaptive.enabled=false > spark.sql.autoBroadcastJoinThreshold=-1 > {code} > Spark Plan says that cache is used > {code:scala} > == Physical Plan == > Execute InsertIntoHadoopFsRelationCommand (27) > +- Coalesce (26) > +- InMemoryTableScan (1) > +- InMemoryRelation (2) > +- Union (25) > :- * SortMergeJoin Inner (13) > : :- * Sort (7) > : : +- Exchange (6) > : : +- * Project (5) > : : +- * Filter (4) > : : +- Scan csv (3) > : +- * Sort (12) > : +- Exchange (11) > : +- * Project (10) > : +- * Filter (9) > : +- Scan csv (8) > +- * SortMergeJoin Inner (24) > :- * Sort (18) > : +- Exchange (17) > : +- * Project (16) > : +- * Filter (15) > : +- Scan csv (14) > +- * Sort (23) > +- Exchange (22) > +- * Project (21) > +- * Filter (20) > +- Scan csv (19) > {code} > But when running on YARN, the csv job has shuffle reads. > !image-2023-12-01-09-27-39-463.png! > *Additional info* > - I was unable to reproduce it with local Spark. > - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join > conditions are changed to just {{{}"id"{}}}, the issue disappears! > - This behaviour is stable - it's not a result of failed instances. > *Production impact* > Without cache saving data in production takes much longer (30 seconds vs 18 > seconds). To avoid shuffle reads, we had to add a {{repartition}} step before > {{cache}} as a workaround, which reduced time from 18 seconds to 10. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org