[ https://issues.apache.org/jira/browse/SPARK-37328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443801#comment-17443801 ]
Apache Spark commented on SPARK-37328: -------------------------------------- User 'Liulietong' has created a pull request for this issue: https://github.com/apache/spark/pull/34602 > SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was > applied on whole plan innstead of new stage plan > ----------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-37328 > URL: https://issues.apache.org/jira/browse/SPARK-37328 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.2.0 > Reporter: Lietong Liu > Priority: Major > > Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to > queryStagePreparationRules, the position OptimizeSkewedJoin was applied has > been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin > applied on changed from plan of new stage which is about to submit to whole > spark plan. > In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not > work because the number of collected shuffleStages is more than 2. > The following test will prove it: > > > {code:java} > test("OptimizeSkewJoin may not work") { > withSQLConf( > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", > SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", > SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", > SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", > SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", > SQLConf.SHUFFLE_PARTITIONS.key -> "10") { > withTempView("skewData1", "skewData2", "skewData3") { > spark > .range(0, 1000, 1, 10) > .selectExpr("id % 3 as key1", "id % 3 as value1") > .createOrReplaceTempView("skewData1") > spark > .range(0, 1000, 1, 10) > .selectExpr("id % 1 as key2", "id as value2") > .createOrReplaceTempView("skewData2") > spark > .range(0, 1000, 1, 10) > .selectExpr("id % 1 as key3", "id as value3") > .createOrReplaceTempView("skewData3") > // Query has two skewedJoin in two continuous stages. > val (_, adaptive1) = > runAdaptiveAndVerifyResult( > """ > |SELECT key1 FROM skewData1 s1 > |JOIN skewData2 s2 > |ON s1.key1 = s2.key2 > |JOIN skewData3 > |ON s1.value1 = value3 > |""".stripMargin) > val shuffles1 = collect(adaptive1) { > case s: ShuffleExchangeExec => s > } > assert(shuffles1.size == 4) > val smj1 = findTopLevelSortMergeJoin(adaptive1) > assert(smj1.size == 2 && smj1.forall(_.isSkewJoin)) > } > } > } {code} > I'll open a PR shortly to fix this issue > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org