[ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17953403#comment-17953403
 ] 

Vindhya G commented on SPARK-44378:
-----------------------------------

The reason seems to be this 
[https://github.com/apache/spark/blob/e2f4f5b9a48af94b20b8224780a8b2c2981da758/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L213]
 which is checking how much partitions is needed based on the data size.  When 
AQE is enabled, it tries to reduce the partitions of the resulting RDD based on 
data size leading to extra job. With the config   
{*}spark.sql.adaptive.coalescePartitions.enabled false{*},  partitions remained 
the same but still the job is taking considerable time locally.  But with 
playing around the value of  
*spark.sql.shuffle.partitions* values lesser than 200 (4 for the above example 
considering the data size) I could see the difference in the time taken. May be 
you can give a try to see if it optimises your jobs.  

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --------------------------------------------------------------------------
>
>                 Key: SPARK-44378
>                 URL: https://issues.apache.org/jira/browse/SPARK-44378
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Submit
>    Affects Versions: 3.1.2
>            Reporter: Priyanka Raju
>            Priority: Major
>              Labels: aqe
>         Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png, image2.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
>     id: Int,
>     name: String
>  )
>  
>     val partCount = 4
>     val input1 = (0 until 100).map(part => Record(part, "a"))
>  
>     val input2 = (100 until 110).map(part => Record(part, "c"))
>  
>     implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
>     val ds1 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input1, partCount)
>     )
>  
>     va
> l ds2 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input2, partCount)
>     )
>  
>     val ds3 = ds1.join(ds2, Seq("id"))
>     val l = ds3.count()
>  
>     val incomingPartitions = ds3.rdd.getNumPartitions
>     log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
> Spark UI for the same job without AQE:
>  
> !Screenshot 2023-07-11 at 9.36.19 AM.png!
>  
> This is causing unexpected regression in our jobs when we try to enable AQE 
> for our jobs in production. We use spark 3.1 in production, but I can see the 
> same behavior in spark 3.2 from the console as well
>  
> !image2.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to