This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cb2f47bf7ec1 [SPARK-46485][SQL] V1Write should not add Sort when not needed cb2f47bf7ec1 is described below commit cb2f47bf7ec14edccee0d7a1ad2b12a47f1fa7d2 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Sat Dec 23 00:08:33 2023 +0800 [SPARK-46485][SQL] V1Write should not add Sort when not needed ### What changes were proposed in this pull request? In `V1Writes`, we try to avoid adding Sort if the output ordering always satisfies. However, the code is completely broken with two issues: - we put `SortOrder` as the child of another `SortOrder` and compare, which always returns false. - once we add a project to do `empty2null`, we change the query output attribute id and the sort order never matches. It's not a big issue as we still have QO rules to eliminate useless sorts, but https://github.com/apache/spark/pull/44429 exposes this problem because the way we optimize sort is a bit different. For `V1Writes`, we should always avoid adding sort even if the number of ordering key is less, to not change the user query. ### Why are the changes needed? fix code mistakes. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44458 from cloud-fan/sort. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/datasources/V1Writes.scala | 4 +- .../datasources/V1WriteCommandSuite.scala | 54 ++++++++-------------- 2 files changed, 20 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index b1d2588ede62..d7a8d7aec0b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -102,8 +102,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { val requiredOrdering = write.requiredOrdering.map(_.transform { case a: Attribute => attrMap.getOrElse(a, a) }.asInstanceOf[SortOrder]) - val outputOrdering = query.outputOrdering - val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering) + val outputOrdering = empty2NullPlan.outputOrdering + val orderingMatched = isOrderingMatched(requiredOrdering.map(_.child), outputOrdering) if (orderingMatched) { empty2NullPlan } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 3ca516463d36..ce43edb79c12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -223,24 +223,15 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } // assert the outer most sort in the executed plan - val sort = plan.collectFirst { case s: SortExec => s } - if (enabled) { - // With planned write, optimizer is more efficient and can eliminate the `SORT BY`. - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } else { - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) } } } @@ -279,24 +270,15 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write } // assert the outer most sort in the executed plan - val sort = plan.collectFirst { case s: SortExec => s } - if (enabled) { - // With planned write, optimizer is more efficient and can eliminate the `SORT BY`. - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } else { - assert(sort.exists { - case SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _) => true - case _ => false - }, plan) - } + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org