This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ea4b7a242910 [SPARK-47991][SQL][TEST] Arrange the test cases for window frames and window functions ea4b7a242910 is described below commit ea4b7a2429106067eb30b6b47bf7c42059053d31 Author: beliefer <belie...@163.com> AuthorDate: Thu Apr 25 20:54:27 2024 -0700 [SPARK-47991][SQL][TEST] Arrange the test cases for window frames and window functions ### What changes were proposed in this pull request? This PR propose to arrange the test cases for window frames and window functions. ### Why are the changes needed? Currently, `DataFrameWindowFramesSuite` and `DataFrameWindowFunctionsSuite` have different testing objectives. The comments for the above two classes are as follows: `DataFrameWindowFramesSuite` is `Window frame testing for DataFrame API.` `DataFrameWindowFunctionsSuite` is `Window function testing for DataFrame API.` But there are some test cases for window frame placed into `DataFrameWindowFunctionsSuite`. ### Does this PR introduce _any_ user-facing change? 'No'. Just arrange the test cases for window frames and window functions. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #46226 from beliefer/SPARK-47991. Authored-by: beliefer <belie...@163.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/DataFrameWindowFramesSuite.scala | 48 ++++++++++++++++++++++ .../spark/sql/DataFrameWindowFunctionsSuite.scala | 48 ---------------------- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala index fe1393af8174..95f4cc78d156 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala @@ -32,6 +32,28 @@ import org.apache.spark.sql.types.CalendarIntervalType class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession { import testImplicits._ + test("reuse window partitionBy") { + val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") + val w = Window.partitionBy("key").orderBy("value") + + checkAnswer( + df.select( + lead("key", 1).over(w), + lead("value", 1).over(w)), + Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) + } + + test("reuse window orderBy") { + val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") + val w = Window.orderBy("value").partitionBy("key") + + checkAnswer( + df.select( + lead("key", 1).over(w), + lead("value", 1).over(w)), + Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) + } + test("lead/lag with empty data frame") { val df = Seq.empty[(Int, String)].toDF("key", "value") val window = Window.partitionBy($"key").orderBy($"value") @@ -570,4 +592,30 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-34227: WindowFunctionFrame should clear its states during preparation") { + // This creates a single partition dataframe with 3 records: + // "a", 0, null + // "a", 1, "x" + // "b", 0, null + val df = spark.range(0, 3, 1, 1).select( + when($"id" < 2, lit("a")).otherwise(lit("b")).as("key"), + ($"id" % 2).cast("int").as("order"), + when($"id" % 2 === 0, lit(null)).otherwise(lit("x")).as("value")) + + val window1 = Window.partitionBy($"key").orderBy($"order") + .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + val window2 = Window.partitionBy($"key").orderBy($"order") + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + checkAnswer( + df.select( + $"key", + $"order", + nth_value($"value", 1, ignoreNulls = true).over(window1), + nth_value($"value", 1, ignoreNulls = true).over(window2)), + Seq( + Row("a", 0, "x", null), + Row("a", 1, "x", "x"), + Row("b", 0, null, null))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 4c852711451c..e3aff9b36aec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -44,28 +44,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest import testImplicits._ - test("reuse window partitionBy") { - val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") - val w = Window.partitionBy("key").orderBy("value") - - checkAnswer( - df.select( - lead("key", 1).over(w), - lead("value", 1).over(w)), - Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) - } - - test("reuse window orderBy") { - val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") - val w = Window.orderBy("value").partitionBy("key") - - checkAnswer( - df.select( - lead("key", 1).over(w), - lead("value", 1).over(w)), - Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil) - } - test("rank functions in unspecific window") { withTempView("window_table") { val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value") @@ -1156,32 +1134,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest Row(Seq(0.0f, -0.0f), Row(0.0d, Double.NaN), Seq(Row(0.0d, 0.0/0.0)), 2))) } - test("SPARK-34227: WindowFunctionFrame should clear its states during preparation") { - // This creates a single partition dataframe with 3 records: - // "a", 0, null - // "a", 1, "x" - // "b", 0, null - val df = spark.range(0, 3, 1, 1).select( - when($"id" < 2, lit("a")).otherwise(lit("b")).as("key"), - ($"id" % 2).cast("int").as("order"), - when($"id" % 2 === 0, lit(null)).otherwise(lit("x")).as("value")) - - val window1 = Window.partitionBy($"key").orderBy($"order") - .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) - val window2 = Window.partitionBy($"key").orderBy($"order") - .rowsBetween(Window.unboundedPreceding, Window.currentRow) - checkAnswer( - df.select( - $"key", - $"order", - nth_value($"value", 1, ignoreNulls = true).over(window1), - nth_value($"value", 1, ignoreNulls = true).over(window2)), - Seq( - Row("a", 0, "x", null), - Row("a", 1, "x", "x"), - Row("b", 0, null, null))) - } - test("SPARK-38237: require all cluster keys for child required distribution for window query") { def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { expressions.flatMap { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org