We have a few spark scala jobs that are currently running in production.
Most jobs typically use Dataset, Dataframes. There is a small code in our
custom library code, that makes rdd calls example to check if the dataframe
is empty: df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate
job of its own and the entire dag is executed 2x, taking 2x more time. This
does not happen when AQE is disabled. Why does this happen and what is the
best way to fix the issue?


Sample code to reproduce the issue code also in gist
<https://gist.github.com/priyankar-stripe/0ed4510e4c0681127e2a2628a4ac4221>:



import org.apache.spark.sql._

  case class Record(

    id: Int,

    name: String

)

    val partCount = 4

    val input1 = (0 until 100).map(part => Record(part, "a"))

    val input2 = (100 until 110).map(part => Record(part, "c"))

    implicit val enc: Encoder[Record] = Encoders.product[Record]

    val ds1 = spark.createDataset(

      spark.sparkContext

        .parallelize(input1, partCount)

    )

    val ds2 = spark.createDataset(

      spark.sparkContext

        .parallelize(input2, partCount)

    )

    val ds3 = ds1.join(ds2, Seq("id"))

    val l = ds3.count()

    val incomingPartitions = ds3.rdd.getNumPartitions

    log.info(s"Num partitions ${incomingPartitions}")


Spark UI job view with AQE enabled.



Spark UI job view without AQE.




We use spark 3.1 in production, but I can see the same behavior in spark
3.2 from the spark-shell as well


This is causing unexpected regression in our jobs when we try to enable AQE
for our jobs in production.

-- 
Regards,
Priyanka

Reply via email to