This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1e525c1  [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
1e525c1 is described below

commit 1e525c1156ee5558bda3937751588a50bc132c38
Author: allisonwang-db <66282705+allisonwang...@users.noreply.github.com>
AuthorDate: Fri Nov 20 09:47:54 2020 -0800

    [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
    
    Backport #30373 for branch-3.0.
    
    ### What changes were proposed in this pull request?
    This PR switched the order for the rule `RemoveRedundantSorts` and 
`EnsureRequirements` so that `EnsureRequirements` will be invoked before 
`RemoveRedundantSorts` to avoid IllegalArgumentException when instantiating 
PartitioningCollection.
    
    ### Why are the changes needed?
    `RemoveRedundantSorts` rule uses SparkPlan's `outputPartitioning` to check 
whether a sort node is redundant. Currently, it is added before 
`EnsureRequirements`. Since `PartitioningCollection` requires left and right 
partitioning to have the same number of partitions, which is not necessarily 
true before applying `EnsureRequirements`, the rule can fail with the following 
exception:
    ```
    IllegalArgumentException: requirement failed: PartitioningCollection 
requires all of its partitionings have the same numPartitions.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #30438 from allisonwang-db/spark-33472-3.0.
    
    Authored-by: allisonwang-db 
<66282705+allisonwang...@users.noreply.github.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/execution/QueryExecution.scala       |  4 +++-
 .../org/apache/spark/sql/execution/SparkPlan.scala |  7 +++++-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  4 ++--
 .../sql/execution/RemoveRedundantSortsSuite.scala  | 25 ++++++++++++++++++++++
 4 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 574a67f..7f5a5e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -297,8 +297,10 @@ object QueryExecution {
     Seq(
       PlanDynamicPruningFilters(sparkSession),
       PlanSubqueries(sparkSession),
-      RemoveRedundantSorts(sparkSession.sessionState.conf),
       EnsureRequirements(sparkSession.sessionState.conf),
+      // `RemoveRedundantSorts` needs to be added before `EnsureRequirements` 
to guarantee the same
+      // number of partitions when instantiating PartitioningCollection.
+      RemoveRedundantSorts(sparkSession.sessionState.conf),
       ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
         sparkSession.sessionState.columnarRules),
       CollapseCodegenStages(sparkSession.sessionState.conf),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ead8c00..062aa69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   def longMetric(name: String): SQLMetric = metrics(name)
 
   // TODO: Move to `DistributedPlan`
-  /** Specifies how data is partitioned across different nodes in the cluster. 
*/
+  /**
+   * Specifies how data is partitioned across different nodes in the cluster.
+   * Note this method may fail if it is invoked before `EnsureRequirements` is 
applied
+   * since `PartitioningCollection` requires all its partitionings to have
+   * the same number of partitions.
+   */
   def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG 
WIDTH!
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 4e73f06..187827c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -90,8 +90,8 @@ case class AdaptiveSparkPlanExec(
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
   private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
-    removeRedundantSorts,
-    ensureRequirements
+    ensureRequirements,
+    removeRedundantSorts
   ) ++ context.session.sessionState.queryStagePrepRules
 
   // A list of physical optimizer rules to be applied to a new stage before 
its execution. These
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 1978d22..9cea812 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -99,6 +101,29 @@ abstract class RemoveRedundantSortsSuiteBase
       }
     }
   }
+
+  test("SPARK-33472: shuffled join with different left and right side 
partition numbers") {
+    withTempView("t1", "t2") {
+      spark.range(0, 100, 1, 2).select('id as 
"key").createOrReplaceTempView("t1")
+      (0 to 100).toDF("key").createOrReplaceTempView("t2")
+
+      val query = """
+        |SELECT /*+ MERGE(t1) */ t1.key
+        |FROM t1 JOIN t2 ON t1.key = t2.key
+        |WHERE t1.key > 10 AND t2.key < 50
+        |ORDER BY t1.key ASC
+      """.stripMargin
+
+      val df = sql(query)
+      val sparkPlan = df.queryExecution.sparkPlan
+      val join = sparkPlan.collect { case j: SortMergeJoinExec => j }.head
+      val leftPartitioning = join.left.outputPartitioning
+      assert(leftPartitioning.isInstanceOf[RangePartitioning])
+      assert(leftPartitioning.numPartitions == 2)
+      assert(join.right.outputPartitioning == UnknownPartitioning(0))
+      checkSorts(query, 3, 3)
+    }
+  }
 }
 
 class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to