[ 
https://issues.apache.org/jira/browse/SPARK-37328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lietong Liu updated SPARK-37328:
--------------------------------
    Summary: SPARK-33832 brings the bug that OptimizeSkewedJoin may not work 
since it was applied on whole plan innstead of new stage plan  (was: 
SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was 
applied onn whole plan innstead of new stage plan)

> SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was 
> applied on whole plan innstead of new stage plan
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37328
>                 URL: https://issues.apache.org/jira/browse/SPARK-37328
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Lietong Liu
>            Priority: Major
>
> Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 
> queryStagePreparationRules, the position OptimizeSkewedJoin was applied has 
> been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin 
> applied on changed from plan of new stage which is about to submit to whole 
> spark plan.
> In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not 
> work because the number of collected shuffleStages is more than 2.
> The following test will prove it:
>  
>  
> {code:java}
> test("OptimizeSkewJoin may not work") {
>   withSQLConf(
>     SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
>     SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
>     SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
>     SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
>     SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
>     SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
>     withTempView("skewData1", "skewData2", "skewData3") {
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 3 as key1", "id % 3 as value1")
>         .createOrReplaceTempView("skewData1")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key2", "id as value2")
>         .createOrReplaceTempView("skewData2")
>       spark
>         .range(0, 1000, 1, 10)
>         .selectExpr("id % 1 as key3", "id as value3")
>         .createOrReplaceTempView("skewData3")
>       // Query has two skewedJoin in two continuous stages.
>       val (_, adaptive1) =
>         runAdaptiveAndVerifyResult(
>           """
>             |SELECT key1 FROM skewData1 s1
>             |JOIN skewData2 s2
>             |ON s1.key1 = s2.key2
>             |JOIN skewData3
>             |ON s1.value1 = value3
>             |""".stripMargin)
>       val shuffles1 = collect(adaptive1) {
>         case s: ShuffleExchangeExec => s
>       }
>       assert(shuffles1.size == 4)
>       val smj1 = findTopLevelSortMergeJoin(adaptive1)
>       assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
>     }
>   }
> } {code}
> I'll open a PR shortly to fix this issue
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to