This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9353437ce11 [SPARK-40002][SQL] Don't push down limit through window using ntile 9353437ce11 is described below commit 9353437ce11f646373d557f2c24aeb381b2aeec5 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Tue Aug 9 11:39:58 2022 +0900 [SPARK-40002][SQL] Don't push down limit through window using ntile ### What changes were proposed in this pull request? Change `LimitPushDownThroughWindow` so that it no longer supports pushing down a limit through a window using ntile. ### Why are the changes needed? In an unpartitioned window, the ntile function is currently applied to the result of the limit. This behavior produces results that conflict with Spark 3.1.3, Hive 2.3.9 and Prestodb 0.268 #### Example Assume this data: ``` create table t1 stored as parquet as select * from range(101); ``` Also assume this query: ``` select id, ntile(10) over (order by id) as nt from t1 limit 10; ``` With Spark 3.2.2, Spark 3.3.0, and master, the limit is applied before the ntile function: ``` +---+---+ |id |nt | +---+---+ |0 |1 | |1 |2 | |2 |3 | |3 |4 | |4 |5 | |5 |6 | |6 |7 | |7 |8 | |8 |9 | |9 |10 | +---+---+ ``` With Spark 3.1.3, and Hive 2.3.9, and Prestodb 0.268, the limit is applied _after_ ntile. Spark 3.1.3: ``` +---+---+ |id |nt | +---+---+ |0 |1 | |1 |1 | |2 |1 | |3 |1 | |4 |1 | |5 |1 | |6 |1 | |7 |1 | |8 |1 | |9 |1 | +---+---+ ``` Hive 2.3.9: ``` +-----+-----+ | id | nt | +-----+-----+ | 0 | 1 | | 1 | 1 | | 2 | 1 | | 3 | 1 | | 4 | 1 | | 5 | 1 | | 6 | 1 | | 7 | 1 | | 8 | 1 | | 9 | 1 | +-----+-----+ 10 rows selected (1.72 seconds) ``` Prestodb 0.268: ``` id | nt ----+---- 0 | 1 1 | 1 2 | 1 3 | 1 4 | 1 5 | 1 6 | 1 7 | 1 8 | 1 9 | 1 (10 rows) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Two new unit tests. Closes #37443 from bersprockets/pushdown_ntile. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit c9156e5a3b9cb290c7cdda8db298c9875e67aa5e) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/catalyst/optimizer/LimitPushDownThroughWindow.scala | 5 ++--- .../optimizer/LimitPushdownThroughWindowSuite.scala | 13 ++++++++++++- .../apache/spark/sql/DataFrameWindowFunctionsSuite.scala | 13 +++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala index 635434741b9..88f92262dcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW} @@ -33,8 +33,7 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] { // The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW. private def supportsPushdownThroughWindow( windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { - case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber, - WindowSpecDefinition(Nil, _, + case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, WindowSpecDefinition(Nil, _, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala index b09d10b2601..99812d20bf5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{CurrentRow, NTile, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -198,4 +198,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest { Optimize.execute(originalQuery.analyze), WithoutOptimize.execute(originalQuery.analyze)) } + + test("SPARK-40002: Should not push through ntile window function") { + val originalQuery = testRelation + .select(a, b, c, + windowExpr(new NTile(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("nt")) + .limit(2) + + comparePlans( + Optimize.execute(originalQuery.analyze), + WithoutOptimize.execute(originalQuery.analyze)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 557b278f9c4..e57650ff629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1203,4 +1203,17 @@ class DataFrameWindowFunctionsSuite extends QueryTest ) ) } + + test("SPARK-40002: ntile should apply before limit") { + val df = Seq.tabulate(101)(identity).toDF("id") + val w = Window.orderBy("id") + checkAnswer( + df.select($"id", ntile(10).over(w)).limit(3), + Seq( + Row(0, 1), + Row(1, 1), + Row(2, 1) + ) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org