Repository: spark Updated Branches: refs/heads/master ea0a5eef2 -> ee3af15fe
[SPARK-22363][SQL][TEST] Add unit test for Window spilling ## What changes were proposed in this pull request? There is already test using window spilling, but the test coverage is not ideal. In this PR the already existing test was fixed and additional cases added. ## How was this patch tested? Automated: Pass the Jenkins. Author: Gabor Somogyi <gabor.g.somo...@gmail.com> Closes #20022 from gaborgsomogyi/SPARK-22363. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee3af15f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee3af15f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee3af15f Branch: refs/heads/master Commit: ee3af15fea18356a9223d61cfe6aaa98ab4dc733 Parents: ea0a5ee Author: Gabor Somogyi <gabor.g.somo...@gmail.com> Authored: Sun Dec 31 14:47:23 2017 +0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Sun Dec 31 14:47:23 2017 +0800 ---------------------------------------------------------------------- .../sql/DataFrameWindowFunctionsSuite.scala | 44 +++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ee3af15f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index ea725af..01c988e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} +import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -518,9 +519,46 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0))) } + test("Window spill with less than the inMemoryThreshold") { + val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value") + val window = Window.partitionBy($"key").orderBy($"value") + + withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "2", + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") { + assertNotSpilled(sparkContext, "select") { + df.select($"key", sum("value").over(window)).collect() + } + } + } + + test("Window spill with more than the inMemoryThreshold but less than the spillThreshold") { + val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value") + val window = Window.partitionBy($"key").orderBy($"value") + + withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") { + assertNotSpilled(sparkContext, "select") { + df.select($"key", sum("value").over(window)).collect() + } + } + } + + test("Window spill with more than the inMemoryThreshold and spillThreshold") { + val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", "value") + val window = Window.partitionBy($"key").orderBy($"value") + + withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") { + assertSpilled(sparkContext, "select") { + df.select($"key", sum("value").over(window)).collect() + } + } + } + test("SPARK-21258: complex object in combination with spilling") { // Make sure we trigger the spilling path. - withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") { + withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1", + SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") { val sampleSchema = new StructType(). add("f0", StringType). add("f1", LongType). @@ -558,7 +596,9 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ - spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ => () } + assertSpilled(sparkContext, "select") { + spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ => () } + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org