[ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434515#comment-17434515
 ] 

Shardul Mahadik commented on SPARK-36877:
-----------------------------------------

Was able to get around this by re-using the RDD for further DF operations
{code:scala}
val df = /* some expensive multi-table/multi-stage join */
val rdd = df.rdd
val numPartitions = rdd.getNumPartitions
val dfFromRdd = spark.createDataset(rdd)(df.encoder)
dfFromRdd.repartition(x).write.....
{code}

> Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing 
> reruns
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-36877
>                 URL: https://issues.apache.org/jira/browse/SPARK-36877
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.1
>            Reporter: Shardul Mahadik
>            Priority: Major
>         Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.....
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10                                                    (0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to