Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20019#discussion_r158200860
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala 
---
    @@ -0,0 +1,381 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.sql.{Date, Timestamp}
    +
    +import org.apache.spark.sql.expressions.Window
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.unsafe.types.CalendarInterval
    +
    +/**
    + * Window frame testing for DataFrame API.
    + */
    +class DataFrameWindowFramesSuite extends QueryTest with SharedSQLContext {
    +  import testImplicits._
    +
    +  test("lead/lag with empty data frame") {
    +    val df = Seq.empty[(Int, String)].toDF("key", "value")
    +    val window = Window.partitionBy($"key").orderBy($"value")
    +
    +    checkAnswer(
    +      df.select(
    +        lead("value", 1).over(window),
    +        lag("value", 1).over(window)),
    +      Nil)
    +  }
    +
    +  test("lead/lag with positive offset") {
    +    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", 
"value")
    +    val window = Window.partitionBy($"key").orderBy($"value")
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        lead("value", 1).over(window),
    +        lag("value", 1).over(window)),
    +      Row(1, "3", null) :: Row(1, null, "1") :: Row(2, "4", null) :: 
Row(2, null, "2") :: Nil)
    +  }
    +
    +  test("reverse lead/lag with positive offset") {
    +    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", 
"value")
    +    val window = Window.partitionBy($"key").orderBy($"value".desc)
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        lead("value", 1).over(window),
    +        lag("value", 1).over(window)),
    +      Row(1, "1", null) :: Row(1, null, "3") :: Row(2, "2", null) :: 
Row(2, null, "4") :: Nil)
    +  }
    +
    +  test("lead/lag with negative offset") {
    +    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", 
"value")
    +    val window = Window.partitionBy($"key").orderBy($"value")
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        lead("value", -1).over(window),
    +        lag("value", -1).over(window)),
    +      Row(1, null, "3") :: Row(1, "1", null) :: Row(2, null, "4") :: 
Row(2, "2", null) :: Nil)
    +  }
    +
    +  test("reverse lead/lag with negative offset") {
    +    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4")).toDF("key", 
"value")
    +    val window = Window.partitionBy($"key").orderBy($"value".desc)
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        lead("value", -1).over(window),
    +        lag("value", -1).over(window)),
    +      Row(1, null, "1") :: Row(1, "3", null) :: Row(2, null, "2") :: 
Row(2, "4", null) :: Nil)
    +  }
    +
    +  test("lead/lag with default value") {
    +    val default = "n/a"
    +    val df = Seq((1, "1"), (2, "2"), (1, "3"), (2, "4"), (2, 
"5")).toDF("key", "value")
    +    val window = Window.partitionBy($"key").orderBy($"value")
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        lead("value", 2, default).over(window),
    +        lag("value", 2, default).over(window),
    +        lead("value", -2, default).over(window),
    +        lag("value", -2, default).over(window)),
    +      Row(1, default, default, default, default) :: Row(1, default, 
default, default, default) ::
    +        Row(2, "5", default, default, "5") :: Row(2, default, "2", "2", 
default) ::
    +        Row(2, default, default, default, default) :: Nil)
    +  }
    +
    +  test("rows/range between with empty data frame") {
    +    val df = Seq.empty[(String, Int)].toDF("key", "value")
    +    val window = Window.partitionBy($"key").orderBy($"value")
    +
    +    checkAnswer(
    +      df.select(
    +        'key,
    +        first("value").over(
    +          window.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
    +        first("value").over(
    +          window.rangeBetween(Window.unboundedPreceding, 
Window.unboundedFollowing))),
    +      Nil)
    +  }
    +
    +  test("rows between should accept int/long values as boundary") {
    +    val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, 
"1"), (2147483650L, "2"))
    +      .toDF("key", "value")
    +
    +    checkAnswer(
    +      df.select(
    +        $"key",
    +        count("key").over(
    +          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 
2147483647))),
    +      Seq(Row(1, 3), Row(1, 4), Row(2, 2), Row(3, 2), Row(2147483650L, 1), 
Row(2147483650L, 1))
    +    )
    +
    +    val e = intercept[AnalysisException](
    +      df.select(
    +        $"key",
    +        count("key").over(
    +          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 
2147483648L))))
    +    assert(e.message.contains("Boundary end is not a valid integer: 
2147483648"))
    +  }
    +
    +  test("range between should accept at most one ORDER BY expression when 
unbounded") {
    --- End diff --
    
    "unbounded" -> "bounded"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to