[ 
https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitaliy Savkin updated SPARK-46198:
-----------------------------------
    Attachment: shuffle.png

> Unexpected Shuffle Read when using cached DataFrame
> ---------------------------------------------------
>
>                 Key: SPARK-46198
>                 URL: https://issues.apache.org/jira/browse/SPARK-46198
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Vitaliy Savkin
>            Priority: Major
>         Attachments: shuffle.png
>
>
> When a computation is base on a cached data frames, I expect to see no 
> Shuffle Reads, but it happens under certain circumstances.
> *Reproduction*
> {code:scala}
>     val ctx: SQLContext = // init context 
>     val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"
>     def populateAndRead(tag: String): DataFrame = {
>       val path = s"$root/numbers_$tag"
> //      import ctx.implicits._
> //      import org.apache.spark.sql.functions.lit
> //      (0 to 10 * 1000 * 1000)
> //        .toDF("id")
> //        .withColumn(tag, lit(tag.toUpperCase))
> //        .repartition(100)
> //        .write
> //        .option("header", "true")
> //        .mode("ignore")
> //        .csv(path)
>       ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag 
> + "_id")
>     }
>     val dfa = populateAndRead("a1")
>     val dfb = populateAndRead("b1")
>     val res =
>       dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
>         .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
>         .cache()
>     println(res.count())
>     res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
> {code}
> Relevant configs
> {code:scala}
> spark.executor.instances=10
> spark.executor.cores=7
> spark.executor.memory=40g
> spark.executor.memoryOverhead=5g
> spark.shuffle.service.enabled=true
> spark.sql.adaptive.enabled=false
> spark.sql.autoBroadcastJoinThreshold=-1
> {code}
> Spark Plan says that cache is used
> {code:scala}
> == Physical Plan ==
> Execute InsertIntoHadoopFsRelationCommand (27)
> +- Coalesce (26)
>    +- InMemoryTableScan (1)
>          +- InMemoryRelation (2)
>                +- Union (25)
>                   :- * SortMergeJoin Inner (13)
>                   :  :- * Sort (7)
>                   :  :  +- Exchange (6)
>                   :  :     +- * Project (5)
>                   :  :        +- * Filter (4)
>                   :  :           +- Scan csv  (3)
>                   :  +- * Sort (12)
>                   :     +- Exchange (11)
>                   :        +- * Project (10)
>                   :           +- * Filter (9)
>                   :              +- Scan csv  (8)
>                   +- * SortMergeJoin Inner (24)
>                      :- * Sort (18)
>                      :  +- Exchange (17)
>                      :     +- * Project (16)
>                      :        +- * Filter (15)
>                      :           +- Scan csv  (14)
>                      +- * Sort (23)
>                         +- Exchange (22)
>                            +- * Project (21)
>                               +- * Filter (20)
>                                  +- Scan csv  (19)
> {code}
> But when running on YARN, the csv job has shuffle reads.
> !image-2023-12-01-09-27-39-463.png!
> *Additional info*
>  - I was unable to reproduce it with local Spark.
>  - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join 
> conditions are changed to just {{{}"id"{}}}, the issue disappears!
>  - This behaviour is stable - it's not a result of failed instances.
> *Production impact*
> Without cache saving data in production takes much longer (30 seconds vs 18 
> seconds). To avoid shuffle reads, we had to add a {{repartition}} step before 
> {{cache}} as a workaround, which reduced time from 18 seconds to 10.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to