This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2f47bf7ec1 [SPARK-46485][SQL] V1Write should not add Sort when not 
needed
cb2f47bf7ec1 is described below

commit cb2f47bf7ec14edccee0d7a1ad2b12a47f1fa7d2
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Sat Dec 23 00:08:33 2023 +0800

    [SPARK-46485][SQL] V1Write should not add Sort when not needed
    
    ### What changes were proposed in this pull request?
    
    In `V1Writes`, we try to avoid adding Sort if the output ordering always 
satisfies. However, the code is completely broken with two issues:
    - we put `SortOrder` as the child of another `SortOrder` and compare, which 
always returns false.
    - once we add a project to do `empty2null`, we change the query output 
attribute id and the sort order never matches.
    
    It's not a big issue as we still have QO rules to eliminate useless sorts, 
but https://github.com/apache/spark/pull/44429 exposes this problem because the 
way we optimize sort is a bit different. For `V1Writes`, we should always avoid 
adding sort even if the number of ordering key is less, to not change the user 
query.
    
    ### Why are the changes needed?
    
    fix code mistakes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    updated test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44458 from cloud-fan/sort.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/datasources/V1Writes.scala |  4 +-
 .../datasources/V1WriteCommandSuite.scala          | 54 ++++++++--------------
 2 files changed, 20 insertions(+), 38 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index b1d2588ede62..d7a8d7aec0b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -102,8 +102,8 @@ object V1Writes extends Rule[LogicalPlan] with 
SQLConfHelper {
     val requiredOrdering = write.requiredOrdering.map(_.transform {
       case a: Attribute => attrMap.getOrElse(a, a)
     }.asInstanceOf[SortOrder])
-    val outputOrdering = query.outputOrdering
-    val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
+    val outputOrdering = empty2NullPlan.outputOrdering
+    val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), 
outputOrdering)
     if (orderingMatched) {
       empty2NullPlan
     } else {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index 3ca516463d36..ce43edb79c12 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -223,24 +223,15 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
           }
 
           // assert the outer most sort in the executed plan
-          val sort = plan.collectFirst { case s: SortExec => s }
-          if (enabled) {
-            // With planned write, optimizer is more efficient and can 
eliminate the `SORT BY`.
-            assert(sort.exists {
-              case SortExec(Seq(
-                SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _)
-              ), false, _, _) => true
-              case _ => false
-            }, plan)
-          } else {
-            assert(sort.exists {
-              case SortExec(Seq(
-                SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _),
-                SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
-              ), false, _, _) => true
-              case _ => false
-            }, plan)
-          }
+          assert(plan.collectFirst {
+            case s: SortExec => s
+          }.exists {
+            case SortExec(Seq(
+              SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _),
+              SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
+            ), false, _, _) => true
+            case _ => false
+          }, plan)
         }
       }
     }
@@ -279,24 +270,15 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
         }
 
         // assert the outer most sort in the executed plan
-        val sort = plan.collectFirst { case s: SortExec => s }
-        if (enabled) {
-          // With planned write, optimizer is more efficient and can eliminate 
the `SORT BY`.
-          assert(sort.exists {
-            case SortExec(Seq(
-              SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
-            ), false, _, _) => true
-            case _ => false
-          }, plan)
-        } else {
-          assert(sort.exists {
-            case SortExec(Seq(
-              SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _),
-              SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _)
-            ), false, _, _) => true
-            case _ => false
-          }, plan)
-        }
+        assert(plan.collectFirst {
+          case s: SortExec => s
+        }.exists {
+          case SortExec(Seq(
+            SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _),
+            SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _)
+          ), false, _, _) => true
+          case _ => false
+        }, plan)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to