[ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
----------------------------------
    Description: 
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
    id: Int,
    name: String
 )
 
    val partCount = 4
    val input1 = (0 until 100).map(part => Record(part, "a"))
 
    val input2 = (100 until 110).map(part => Record(part, "c"))
 
    implicit val enc: Encoder[Record] = Encoders.product[Record]
 
    val ds1 = spark.createDataset(
      spark.sparkContext
        .parallelize(input1, partCount)
    )
 
    va


l ds2 = spark.createDataset(
      spark.sparkContext
        .parallelize(input2, partCount)
    )
 
    val ds3 = ds1.join(ds2, Seq("id"))
    val l = ds3.count()
 
    val incomingPartitions = ds3.rdd.getNumPartitions
    log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

Spark UI for the same job without AQE:

 

!Screenshot 2023-07-11 at 9.36.19 AM.png!

  was:
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
    id: Int,
    name: String
 )
 
    val partCount = 4
    val input1 = (0 until 100).map(part => Record(part, "a"))
 
    val input2 = (100 until 110).map(part => Record(part, "c"))
 
    implicit val enc: Encoder[Record] = Encoders.product[Record]
 
    val ds1 = spark.createDataset(
      spark.sparkContext
        .parallelize(input1, partCount)
    )
 
    va


l ds2 = spark.createDataset(
      spark.sparkContext
        .parallelize(input2, partCount)
    )
 
    val ds3 = ds1.join(ds2, Seq("id"))
    val l = ds3.count()
 
    val incomingPartitions = ds3.rdd.getNumPartitions
    log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

 


> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --------------------------------------------------------------------------
>
>                 Key: SPARK-44378
>                 URL: https://issues.apache.org/jira/browse/SPARK-44378
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Submit
>    Affects Versions: 3.1.2
>            Reporter: Priyanka Raju
>            Priority: Major
>              Labels: aqe
>         Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
>     id: Int,
>     name: String
>  )
>  
>     val partCount = 4
>     val input1 = (0 until 100).map(part => Record(part, "a"))
>  
>     val input2 = (100 until 110).map(part => Record(part, "c"))
>  
>     implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
>     val ds1 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input1, partCount)
>     )
>  
>     va
> l ds2 = spark.createDataset(
>       spark.sparkContext
>         .parallelize(input2, partCount)
>     )
>  
>     val ds3 = ds1.join(ds2, Seq("id"))
>     val l = ds3.count()
>  
>     val incomingPartitions = ds3.rdd.getNumPartitions
>     log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
> Spark UI for the same job without AQE:
>  
> !Screenshot 2023-07-11 at 9.36.19 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to