This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e9cc1024df4 [SPARK-39867][SQL] Global limit should not inherit 
OrderPreservingUnaryNode
e9cc1024df4 is described below

commit e9cc1024df4d587a0f456842d495db91984ed9db
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Wed Aug 3 11:59:22 2022 +0800

    [SPARK-39867][SQL] Global limit should not inherit OrderPreservingUnaryNode
    
    ### What changes were proposed in this pull request?
    
    Make GlobalLimit inherit UnaryNode rather than OrderPreservingUnaryNode
    
    ### Why are the changes needed?
    
    Global limit can not promise the output ordering is same with child, it 
actually depend on the certain physical plan.
    
    For all physical plan with gobal limits:
    - CollectLimitExec: it does not promise output ordering
    - GlobalLimitExec: it required all tuples so it can assume the child is 
shuffle or child is single partition. Then it can use output ordering of child
    - TakeOrderedAndProjectExec: it do sort inside it's implementation
    
    This bug get worse since we pull out v1 write require ordering.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    fix test and add test
    
    Closes #37284 from ulysses-you/sort.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/plans/logical/basicLogicalOperators.scala    |  7 ++++++-
 .../sql/catalyst/optimizer/EliminateSortsSuite.scala      | 15 ++++++++++-----
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ef5f87b23ec..5d288dc323f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1448,8 +1448,13 @@ object Limit {
  * A global (coordinated) limit. This operator can emit at most `limitExpr` 
number in total.
  *
  * See [[Limit]] for more information.
+ *
+ * Note that, we can not make it inherit [[OrderPreservingUnaryNode]] due to 
the different strategy
+ * of physical plan. The output ordering of child will be broken if a shuffle 
exchange comes in
+ * between the child and global limit, due to the fact that shuffle reader 
fetches blocks in random
+ * order.
  */
-case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends 
OrderPreservingUnaryNode {
+case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends 
UnaryNode {
   override def output: Seq[Attribute] = child.output
   override def maxRows: Option[Long] = {
     limitExpr match {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 9dcc9f89790..edd840d63f9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -116,10 +116,10 @@ class EliminateSortsSuite extends AnalysisTest {
 
   test("SPARK-33183: remove redundant sort by") {
     val orderedPlan = testRelation.select($"a", $"b").orderBy($"a".asc, 
$"b".desc_nullsFirst)
-    val unnecessaryReordered = orderedPlan.limit(2).select($"a")
+    val unnecessaryReordered = LocalLimit(2, orderedPlan).select($"a")
       .sortBy($"a".asc, $"b".desc_nullsFirst)
     val optimized = Optimize.execute(unnecessaryReordered.analyze)
-    val correctAnswer = orderedPlan.limit(2).select($"a").analyze
+    val correctAnswer = LocalLimit(2, orderedPlan).select($"a").analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -163,11 +163,11 @@ class EliminateSortsSuite extends AnalysisTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("SPARK-33183: limits should not affect order for local sort") {
+  test("SPARK-33183: local limits should not affect order for local sort") {
     val orderedPlan = testRelation.select($"a", $"b").orderBy($"a".asc, 
$"b".desc)
-    val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy($"a".asc, 
$"b".desc)
+    val filteredAndReordered = LocalLimit(10, orderedPlan).sortBy($"a".asc, 
$"b".desc)
     val optimized = Optimize.execute(filteredAndReordered.analyze)
-    val correctAnswer = orderedPlan.limit(Literal(10)).analyze
+    val correctAnswer = LocalLimit(10, orderedPlan).analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -444,4 +444,9 @@ class EliminateSortsSuite extends AnalysisTest {
       .sortBy($"c".asc).analyze
     comparePlans(Optimize.execute(plan3), expected3)
   }
+
+  test("SPARK-39867: Global limit should not inherit 
OrderPreservingUnaryNode") {
+    val plan = testRelation.sortBy($"a".asc).limit(2).sortBy($"a".asc).analyze
+    comparePlans(Optimize.execute(plan), plan)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to