[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427089#comment-17427089 ]
Wenchen Fan commented on SPARK-36877: ------------------------------------- > shouldn't it reuse the result from previous stages? One DataFrame means one query, and today Spark can't reuse shuffle/broadcast/subquery across queries. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > ---------------------------------------------------------------------------------- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.2, 3.2.1 > Reporter: Shardul Mahadik > Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write..... > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10 (0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org