[ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
----------------------------------
    Attachment: image2.png

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --------------------------------------------------------------------------
>
>                 Key: SPARK-44378
>                 URL: https://issues.apache.org/jira/browse/SPARK-44378
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Submit
>    Affects Versions: 3.1.2
>            Reporter: Priyanka Raju
>            Priority: Major
>              Labels: aqe
>         Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png, image2.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
>     id: Int,
>     name: String
>  )
>  
>     val partCount = 4
>     val input1 = (0 until 100).map(part => Record(part, "a"))
>  
>     val input2 = (100 until 110).map(part => Record(part, "c"))
>  
>     implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
>     val ds1 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input1, partCount)
>     )
>  
>     va
> l ds2 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input2, partCount)
>     )
>  
>     val ds3 = ds1.join(ds2, Seq("id"))
>     val l = ds3.count()
>  
>     val incomingPartitions = ds3.rdd.getNumPartitions
>     log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
> Spark UI for the same job without AQE:
>  
> !Screenshot 2023-07-11 at 9.36.19 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to