[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434515#comment-17434515 ] Shardul Mahadik commented on SPARK-36877: - Was able to get around this by re-using the RDD for further DF operations {code:scala} val df = /* some expensive multi-table/multi-stage join */ val rdd = df.rdd val numPartitions = rdd.getNumPartitions val dfFromRdd = spark.createDataset(rdd)(df.encoder) dfFromRdd.repartition(x).write. {code} > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17431069#comment-17431069 ] Wenchen Fan commented on SPARK-36877: - You were calling `df3.repartition(5).write`, and `repartition(5)` creates a new DataFrame. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427825#comment-17427825 ] Shardul Mahadik commented on SPARK-36877: - {quote} Getting RDD means the physical plan is finalized. With AQE, finalizing the physical plan means running all the query stages except for the last stage.{quote} Ack! Makes sense. {quote}> shouldn't it reuse the result from previous stages? One DataFrame means one query, and today Spark can't reuse shuffle/broadcast/subquery across queries.{quote} But isn't this the same DF. I am calling {{df.rdd}} and then {{df.write}} where {{df}} is the same. So it is not across queries. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427089#comment-17427089 ] Wenchen Fan commented on SPARK-36877: - > shouldn't it reuse the result from previous stages? One DataFrame means one query, and today Spark can't reuse shuffle/broadcast/subquery across queries. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427087#comment-17427087 ] Wenchen Fan commented on SPARK-36877: - > Should calling df.rdd trigger actual job execution when AQE is enabled? We should. Getting RDD means the physical plan is finalized. With AQE, finalizing the physical plan means running all the query stages except for the last stage. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421939#comment-17421939 ] Hyukjin Kwon commented on SPARK-36877: -- cc [~maryannxue] too FYI > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org