Shardul Mahadik created SPARK-36877:
---------------------------------------

             Summary: Calling ds.rdd with AQE enabled leads to being jobs being 
run, eventually causing reruns
                 Key: SPARK-36877
                 URL: https://issues.apache.org/jira/browse/SPARK-36877
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.2, 3.2.1
            Reporter: Shardul Mahadik
         Attachments: Screen Shot 2021-09-28 at 09.32.20.png

In one of our jobs we perform the following operation:
{code:scala}
val df = /* some expensive multi-table/multi-stage join */
val numPartitions = df.rdd.getNumPartitions
df.repartition(x).write.....
{code}

With AQE enabled, we found that the expensive stages were being run twice 
causing significant performance regression after enabling AQE; once when 
calling {{df.rdd}} and again when calling {{df.write}}.

A more concrete example:
{code:scala}
scala> sql("SET spark.sql.adaptive.enabled=true")
res0: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> val df1 = spark.range(10).withColumn("id2", $"id")
df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
"id").join(spark.range(10), "id")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df3 = df2.groupBy("id2").count()
df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]

scala> df3.rdd.getNumPartitions
res2: Int = 10                                                    (0 + 16) / 16]

scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
{code}

In the screenshot below, you can see that the first 3 stages (0 to 4) were 
rerun again (5 to 9).

I have two questions:
1) Should calling df.rdd trigger actual job execution when AQE is enabled?
2) Should calling df.write later cause rerun of the stages? If df.rdd has 
already partially executed the stages, shouldn't it reuse the result from 
previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to