[jira] [Updated] (SPARK-46198) Unexpected Shuffle Read when using cached DataFrame

2023-12-01 Thread Vitaliy Savkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitaliy Savkin updated SPARK-46198:
---
Description: 
When a computation is based on a cached data frame, I expect to see no Shuffle 
Reads, but they happen under certain circumstances.

*Reproduction*
{code:scala}
val ctx: SQLContext = // init context 
val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"

def populateAndRead(tag: String): DataFrame = {
  val path = s"$root/numbers_$tag"
//  import ctx.implicits._
//  import org.apache.spark.sql.functions.lit
//  (0 to 10 * 1000 * 1000)
//.toDF("id")
//.withColumn(tag, lit(tag.toUpperCase))
//.repartition(100)
//.write
//.option("header", "true")
//.mode("ignore")
//.csv(path)

  ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + 
"_id")
}

val dfa = populateAndRead("a1")
val dfb = populateAndRead("b1")
val res =
  dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
.unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
.cache()

println(res.count())
res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g

spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- Union (25)
                  :- * SortMergeJoin Inner (13)
                  :  :- * Sort (7)
                  :  :  +- Exchange (6)
                  :  :     +- * Project (5)
                  :  :        +- * Filter (4)
                  :  :           +- Scan csv  (3)
                  :  +- * Sort (12)
                  :     +- Exchange (11)
                  :        +- * Project (10)
                  :           +- * Filter (9)
                  :              +- Scan csv  (8)
                  +- * SortMergeJoin Inner (24)
                     :- * Sort (18)
                     :  +- Exchange (17)
                     :     +- * Project (16)
                     :        +- * Filter (15)
                     :           +- Scan csv  (14)
                     +- * Sort (23)
                        +- Exchange (22)
                           +- * Project (21)
                              +- * Filter (20)
                                 +- Scan csv  (19)
{code}
But when running on YARN, the csv job has shuffle reads.

!shuffle.png!

*Additional info*
 - I was unable to reproduce it with local Spark.
 - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join 
conditions are changed to just {{{}"id"{}}}, the issue disappears!
 - This behaviour is stable - it's not a result of failed instances.

*Production impact*

Without cache saving data in production takes much longer (30 seconds vs 18 
seconds). To avoid shuffle reads, we had to add a {{repartition}} step before 
{{cache}} as a workaround, which reduced time from 18 seconds to 10.

  was:
When a computation is base on a cached data frames, I expect to see no Shuffle 
Reads, but it happens under certain circumstances.

*Reproduction*
{code:scala}
val ctx: SQLContext = // init context 
val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"

def populateAndRead(tag: String): DataFrame = {
  val path = s"$root/numbers_$tag"
//  import ctx.implicits._
//  import org.apache.spark.sql.functions.lit
//  (0 to 10 * 1000 * 1000)
//.toDF("id")
//.withColumn(tag, lit(tag.toUpperCase))
//.repartition(100)
//.write
//.option("header", "true")
//.mode("ignore")
//.csv(path)

  ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + 
"_id")
}

val dfa = populateAndRead("a1")
val dfb = populateAndRead("b1")
val res =
  dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
.unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
.cache()

println(res.count())
res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g

spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- Union (25)
                  :- * SortMergeJoin Inner (13)
         

[jira] [Updated] (SPARK-46198) Unexpected Shuffle Read when using cached DataFrame

2023-11-30 Thread Vitaliy Savkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitaliy Savkin updated SPARK-46198:
---
Description: 
When a computation is base on a cached data frames, I expect to see no Shuffle 
Reads, but it happens under certain circumstances.

*Reproduction*
{code:scala}
val ctx: SQLContext = // init context 
val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"

def populateAndRead(tag: String): DataFrame = {
  val path = s"$root/numbers_$tag"
//  import ctx.implicits._
//  import org.apache.spark.sql.functions.lit
//  (0 to 10 * 1000 * 1000)
//.toDF("id")
//.withColumn(tag, lit(tag.toUpperCase))
//.repartition(100)
//.write
//.option("header", "true")
//.mode("ignore")
//.csv(path)

  ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + 
"_id")
}

val dfa = populateAndRead("a1")
val dfb = populateAndRead("b1")
val res =
  dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
.unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
.cache()

println(res.count())
res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g

spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- Union (25)
                  :- * SortMergeJoin Inner (13)
                  :  :- * Sort (7)
                  :  :  +- Exchange (6)
                  :  :     +- * Project (5)
                  :  :        +- * Filter (4)
                  :  :           +- Scan csv  (3)
                  :  +- * Sort (12)
                  :     +- Exchange (11)
                  :        +- * Project (10)
                  :           +- * Filter (9)
                  :              +- Scan csv  (8)
                  +- * SortMergeJoin Inner (24)
                     :- * Sort (18)
                     :  +- Exchange (17)
                     :     +- * Project (16)
                     :        +- * Filter (15)
                     :           +- Scan csv  (14)
                     +- * Sort (23)
                        +- Exchange (22)
                           +- * Project (21)
                              +- * Filter (20)
                                 +- Scan csv  (19)
{code}
But when running on YARN, the csv job has shuffle reads.

!shuffle.png!

*Additional info*
 - I was unable to reproduce it with local Spark.
 - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join 
conditions are changed to just {{{}"id"{}}}, the issue disappears!
 - This behaviour is stable - it's not a result of failed instances.

*Production impact*

Without cache saving data in production takes much longer (30 seconds vs 18 
seconds). To avoid shuffle reads, we had to add a {{repartition}} step before 
{{cache}} as a workaround, which reduced time from 18 seconds to 10.

  was:
When a computation is base on a cached data frames, I expect to see no Shuffle 
Reads, but it happens under certain circumstances.

*Reproduction*
{code:scala}
val ctx: SQLContext = // init context 
val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"

def populateAndRead(tag: String): DataFrame = {
  val path = s"$root/numbers_$tag"
//  import ctx.implicits._
//  import org.apache.spark.sql.functions.lit
//  (0 to 10 * 1000 * 1000)
//.toDF("id")
//.withColumn(tag, lit(tag.toUpperCase))
//.repartition(100)
//.write
//.option("header", "true")
//.mode("ignore")
//.csv(path)

  ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + 
"_id")
}

val dfa = populateAndRead("a1")
val dfb = populateAndRead("b1")
val res =
  dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
.unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
.cache()

println(res.count())
res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g

spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- Union (25)
                  :- * SortMergeJoin Inner (13)
          

[jira] [Updated] (SPARK-46198) Unexpected Shuffle Read when using cached DataFrame

2023-11-30 Thread Vitaliy Savkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitaliy Savkin updated SPARK-46198:
---
Attachment: shuffle.png

> Unexpected Shuffle Read when using cached DataFrame
> ---
>
> Key: SPARK-46198
> URL: https://issues.apache.org/jira/browse/SPARK-46198
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.1
>Reporter: Vitaliy Savkin
>Priority: Major
> Attachments: shuffle.png
>
>
> When a computation is base on a cached data frames, I expect to see no 
> Shuffle Reads, but it happens under certain circumstances.
> *Reproduction*
> {code:scala}
> val ctx: SQLContext = // init context 
> val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"
> def populateAndRead(tag: String): DataFrame = {
>   val path = s"$root/numbers_$tag"
> //  import ctx.implicits._
> //  import org.apache.spark.sql.functions.lit
> //  (0 to 10 * 1000 * 1000)
> //.toDF("id")
> //.withColumn(tag, lit(tag.toUpperCase))
> //.repartition(100)
> //.write
> //.option("header", "true")
> //.mode("ignore")
> //.csv(path)
>   ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag 
> + "_id")
> }
> val dfa = populateAndRead("a1")
> val dfb = populateAndRead("b1")
> val res =
>   dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
> .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
> .cache()
> println(res.count())
> res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
> {code}
> Relevant configs
> {code:scala}
> spark.executor.instances=10
> spark.executor.cores=7
> spark.executor.memory=40g
> spark.executor.memoryOverhead=5g
> spark.shuffle.service.enabled=true
> spark.sql.adaptive.enabled=false
> spark.sql.autoBroadcastJoinThreshold=-1
> {code}
> Spark Plan says that cache is used
> {code:scala}
> == Physical Plan ==
> Execute InsertIntoHadoopFsRelationCommand (27)
> +- Coalesce (26)
>    +- InMemoryTableScan (1)
>          +- InMemoryRelation (2)
>                +- Union (25)
>                   :- * SortMergeJoin Inner (13)
>                   :  :- * Sort (7)
>                   :  :  +- Exchange (6)
>                   :  :     +- * Project (5)
>                   :  :        +- * Filter (4)
>                   :  :           +- Scan csv  (3)
>                   :  +- * Sort (12)
>                   :     +- Exchange (11)
>                   :        +- * Project (10)
>                   :           +- * Filter (9)
>                   :              +- Scan csv  (8)
>                   +- * SortMergeJoin Inner (24)
>                      :- * Sort (18)
>                      :  +- Exchange (17)
>                      :     +- * Project (16)
>                      :        +- * Filter (15)
>                      :           +- Scan csv  (14)
>                      +- * Sort (23)
>                         +- Exchange (22)
>                            +- * Project (21)
>                               +- * Filter (20)
>                                  +- Scan csv  (19)
> {code}
> But when running on YARN, the csv job has shuffle reads.
> !image-2023-12-01-09-27-39-463.png!
> *Additional info*
>  - I was unable to reproduce it with local Spark.
>  - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join 
> conditions are changed to just {{{}"id"{}}}, the issue disappears!
>  - This behaviour is stable - it's not a result of failed instances.
> *Production impact*
> Without cache saving data in production takes much longer (30 seconds vs 18 
> seconds). To avoid shuffle reads, we had to add a {{repartition}} step before 
> {{cache}} as a workaround, which reduced time from 18 seconds to 10.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org