This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new feb48dc146d [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window functions haven't the same window frame as the rank-like functions feb48dc146d is described below commit feb48dc146d8a89882875f25115af52e8295dfcc Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Oct 19 20:16:21 2023 +0800 [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window functions haven't the same window frame as the rank-like functions ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/38799 Introduce the group limit of Window for rank-based filter to optimize top-k computation. But it causes a bug if window expressions exists non-rank function which has the window frame is not the same as `(UnboundedPreceding, CurrentRow)`. Please see the detail at https://issues.apache.org/jira/browse/SPARK-45543. ### Why are the changes needed? Fix the bug. ### Does this PR introduce _any_ user-facing change? 'Yes'. ### How was this patch tested? New test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43385 from beliefer/SPARK-45543. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> (cherry picked from commit d6d4e52ecc3015b41c51bc7e4e122696c76b06ee) Signed-off-by: Jiaan Geng <belie...@163.com> --- .../catalyst/optimizer/InferWindowGroupLimit.scala | 18 +++- .../spark/sql/DataFrameWindowFunctionsSuite.scala | 112 +++++++++++++++++++++ 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala index 261be291463..04204c6a2e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala @@ -52,23 +52,33 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with PredicateHelper { if (limits.nonEmpty) Some(limits.min) else None } - private def support( + /** + * All window expressions should use the same expanding window, so that + * we can safely do the early stop. + */ + private def isExpandingWindow( windowExpression: NamedExpression): Boolean = windowExpression match { - case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, WindowSpecDefinition(_, _, + case Alias(WindowExpression(_, WindowSpecDefinition(_, _, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true case _ => false } + private def support(windowFunction: Expression): Boolean = windowFunction match { + case _: Rank | _: DenseRank | _: RowNumber => true + case _ => false + } + def apply(plan: LogicalPlan): LogicalPlan = { if (conf.windowGroupLimitThreshold == -1) return plan plan.transformWithPruning(_.containsAllPatterns(FILTER, WINDOW), ruleId) { case filter @ Filter(condition, window @ Window(windowExpressions, partitionSpec, orderSpec, child)) - if !child.isInstanceOf[WindowGroupLimit] && windowExpressions.exists(support) && + if !child.isInstanceOf[WindowGroupLimit] && windowExpressions.forall(isExpandingWindow) && orderSpec.nonEmpty => val limits = windowExpressions.collect { - case alias @ Alias(WindowExpression(rankLikeFunction, _), _) if support(alias) => + case alias @ Alias(WindowExpression(rankLikeFunction, _), _) + if support(rankLikeFunction) => extractLimits(condition, alias.toAttribute).map((_, rankLikeFunction)) }.flatten diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index a57e927ba84..47380db4217 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -1521,4 +1521,116 @@ class DataFrameWindowFunctionsSuite extends QueryTest assert(windows.size === 1) } } + + test("SPARK-45543: InferWindowGroupLimit causes bug " + + "if the other window functions haven't the same window frame as the rank-like functions") { + val df = Seq( + (1, "Dave", 1, 2020), + (2, "Dave", 1, 2021), + (3, "Dave", 2, 2022), + (4, "Dave", 3, 2023), + (5, "Dave", 3, 2024), + (6, "Mark", 2, 2022), + (7, "Mark", 3, 2023), + (8, "Mark", 3, 2024), + (9, "Amy", 6, 2021), + (10, "Amy", 5, 2022), + (11, "Amy", 6, 2023), + (12, "Amy", 7, 2024), + (13, "John", 7, 2024)).toDF("id", "name", "score", "year") + + val window = Window.partitionBy($"year").orderBy($"score".desc) + val window2 = window.rowsBetween(Window.unboundedPreceding, Window.currentRow) + val window3 = window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + + Seq(-1, 100).foreach { threshold => + withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) { + // The other window functions have the same window frame as the rank-like functions. + // df2, df3 and df4 can apply InferWindowGroupLimit + val df2 = df + .withColumn("rn", row_number().over(window)) + .withColumn("all_scores", collect_list($"score").over(window2)) + .sort($"year") + + checkAnswer(df2.filter("rn=1"), Seq( + Row(1, "Dave", 1, 2020, 1, Array(1)), + Row(9, "Amy", 6, 2021, 1, Array(6)), + Row(10, "Amy", 5, 2022, 1, Array(5)), + Row(11, "Amy", 6, 2023, 1, Array(6)), + Row(12, "Amy", 7, 2024, 1, Array(7)) + )) + + val df3 = df + .withColumn("rank", rank().over(window)) + .withColumn("all_scores", collect_list($"score").over(window2)) + .sort($"year") + + checkAnswer(df3.filter("rank=2"), Seq( + Row(2, "Dave", 1, 2021, 2, Array(6, 1)), + Row(3, "Dave", 2, 2022, 2, Array(5, 2)), + Row(6, "Mark", 2, 2022, 2, Array(5, 2, 2)), + Row(4, "Dave", 3, 2023, 2, Array(6, 3)), + Row(7, "Mark", 3, 2023, 2, Array(6, 3, 3)) + )) + + val df4 = df + .withColumn("rank", dense_rank().over(window)) + .withColumn("all_scores", collect_list($"score").over(window2)) + .sort($"year") + + checkAnswer(df4.filter("rank=2"), Seq( + Row(2, "Dave", 1, 2021, 2, Array(6, 1)), + Row(3, "Dave", 2, 2022, 2, Array(5, 2)), + Row(6, "Mark", 2, 2022, 2, Array(5, 2, 2)), + Row(4, "Dave", 3, 2023, 2, Array(6, 3)), + Row(7, "Mark", 3, 2023, 2, Array(6, 3, 3)), + Row(5, "Dave", 3, 2024, 2, Array(7, 7, 3)), + Row(8, "Mark", 3, 2024, 2, Array(7, 7, 3, 3)) + )) + + // The other window functions haven't the same window frame as the rank-like functions. + // df5, df6 and df7 cannot apply InferWindowGroupLimit + val df5 = df + .withColumn("rn", row_number().over(window)) + .withColumn("all_scores", collect_list($"score").over(window3)) + .sort($"year") + + checkAnswer(df5.filter("rn=1"), Seq( + Row(1, "Dave", 1, 2020, 1, Array(1)), + Row(9, "Amy", 6, 2021, 1, Array(6, 1)), + Row(10, "Amy", 5, 2022, 1, Array(5, 2, 2)), + Row(11, "Amy", 6, 2023, 1, Array(6, 3, 3)), + Row(12, "Amy", 7, 2024, 1, Array(7, 7, 3, 3)) + )) + + val df6 = df + .withColumn("rank", rank().over(window)) + .withColumn("all_scores", collect_list($"score").over(window3)) + .sort($"year") + + checkAnswer(df6.filter("rank=2"), Seq( + Row(2, "Dave", 1, 2021, 2, Array(6, 1)), + Row(3, "Dave", 2, 2022, 2, Array(5, 2, 2)), + Row(6, "Mark", 2, 2022, 2, Array(5, 2, 2)), + Row(4, "Dave", 3, 2023, 2, Array(6, 3, 3)), + Row(7, "Mark", 3, 2023, 2, Array(6, 3, 3)) + )) + + val df7 = df + .withColumn("rank", dense_rank().over(window)) + .withColumn("all_scores", collect_list($"score").over(window3)) + .sort($"year") + + checkAnswer(df7.filter("rank=2"), Seq( + Row(2, "Dave", 1, 2021, 2, Array(6, 1)), + Row(3, "Dave", 2, 2022, 2, Array(5, 2, 2)), + Row(6, "Mark", 2, 2022, 2, Array(5, 2, 2)), + Row(4, "Dave", 3, 2023, 2, Array(6, 3, 3)), + Row(7, "Mark", 3, 2023, 2, Array(6, 3, 3)), + Row(5, "Dave", 3, 2024, 2, Array(7, 7, 3, 3)), + Row(8, "Mark", 3, 2024, 2, Array(7, 7, 3, 3)) + )) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org