This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0c3c819a68a [SPARK-42525][SQL] Collapse two adjacent windows with the same partition/order in subquery 0c3c819a68a is described below commit 0c3c819a68a8fceedcb5974b2f7e30121cd464e6 Author: zml1206 <zhuml1...@gmail.com> AuthorDate: Sun Feb 26 11:21:31 2023 +0800 [SPARK-42525][SQL] Collapse two adjacent windows with the same partition/order in subquery ### What changes were proposed in this pull request? Extend the CollapseWindow rule to collapse Window nodes, when one window in subquery. ### Why are the changes needed? ``` select a, b, c, row_number() over (partition by a order by b) as d from ( select a, b, rank() over (partition by a order by b) as c from t1) t2 == Optimized Logical Plan == before Window [row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26], [a#11], [b#12 ASC NULLS FIRST] +- Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25], [a#11], [b#12 ASC NULLS FIRST] +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12] +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#7] +- *(1) MapElements org.apache.spark.sql.DataFrameSuite$$Lambda$1517/16288483683a479fda, obj#5: scala.Tuple2 +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), obj#4: java.lang.Long +- *(1) Range (0, 10, step=1, splits=2) after Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25, row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26], [a#11], [b#12 ASC NULLS FIRST] +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12] +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#7] +- *(1) MapElements org.apache.spark.sql.DataFrameSuite$$Lambda$1518/19280286724d7a64ca, obj#5: scala.Tuple2 +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), obj#4: java.lang.Long +- *(1) Range (0, 10, step=1, splits=2) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #40115 from zml1206/SPARK-42525. Authored-by: zml1206 <zhuml1...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 9 +++++++-- .../sql/catalyst/optimizer/CollapseWindowSuite.scala | 20 ++++++++++++++++++++ .../spark/sql/DataFrameWindowFunctionsSuite.scala | 16 ++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1233f2207f5..a0d49c29470 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1251,9 +1251,14 @@ object OptimizeWindowFunctions extends Rule[LogicalPlan] { * independent and are of the same window function type, collapse into the parent. */ object CollapseWindow extends Rule[LogicalPlan] { + private def specCompatible(s1: Seq[Expression], s2: Seq[Expression]): Boolean = { + s1.length == s2.length && + s1.zip(s2).forall(e => e._1.semanticEquals(e._2)) + } + private def windowsCompatible(w1: Window, w2: Window): Boolean = { - w1.partitionSpec == w2.partitionSpec && - w1.orderSpec == w2.orderSpec && + specCompatible(w1.partitionSpec, w2.partitionSpec) && + specCompatible(w1.orderSpec, w2.orderSpec) && w1.references.intersect(w2.windowOutputSet).isEmpty && w1.windowExpressions.nonEmpty && w2.windowExpressions.nonEmpty && // This assumes Window contains the same type of window expressions. This is ensured diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 63cc3554564..515203da7ca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -148,4 +148,24 @@ class CollapseWindowSuite extends PlanTest { comparePlans(optimized, query) } + + test("SPARK-42525: collapse two adjacent windows with the same partition/order " + + "but qualifiers are different ") { + + val query = testRelation + .window(Seq(min(a).as("_we0")), Seq(c.withQualifier(Seq("0"))), Seq(c.asc)) + .select($"a", $"b", $"c", $"_we0" as "min_a") + .window(Seq(max(a).as("_we1")), Seq(c.withQualifier(Seq("1"))), Seq(c.asc)) + .select($"a", $"b", $"c", $"min_a", $"_we1" as "max_a") + .analyze + + val optimized = Optimize.execute(query) + + val correctAnswer = testRelation + .window(Seq(min(a).as("_we0"), max(a).as("_we1")), Seq(c), Seq(c.asc)) + .select(a, b, c, $"_we0" as "min_a", $"_we1" as "max_a") + .analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index b9421f8b13d..1fb937e93b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.matchers.must.Matchers.the import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral} import org.apache.spark.sql.catalyst.optimizer.TransposeWindow +import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} @@ -1429,4 +1430,19 @@ class DataFrameWindowFunctionsSuite extends QueryTest } } } + + test("SPARK-42525: collapse two adjacent windows with the same partition/order in subquery") { + withTempView("t1") { + Seq((1, 1), (2, 2)).toDF("a", "b").createOrReplaceTempView("t1") + val df = sql( + """ + |SELECT a, b, rk, row_number() OVER (PARTITION BY a ORDER BY b) AS rn + |FROM (SELECT a, b, rank() OVER (PARTITION BY a ORDER BY b) AS rk + | FROM t1) t2 + |""".stripMargin) + + val windows = df.queryExecution.optimizedPlan.collect { case w: LogicalWindow => w } + assert(windows.size === 1) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org