Repository: spark
Updated Branches:
  refs/heads/master ff4bb836a -> a7ab7f234


[SPARK-25845][SQL] Fix MatchError for calendar interval type in range frame 
left boundary

## What changes were proposed in this pull request?

WindowSpecDefinition checks start < last, but CalendarIntervalType is not 
comparable, so it would throw the following exception at runtime:

```
 scala.MatchError: CalendarIntervalType (of class 
org.apache.spark.sql.types.CalendarIntervalType$)      at
 
org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:58)
 at
 
org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering$lzycompute(predicates.scala:592)
 at
 
org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering(predicates.scala:592)
 at
 
org.apache.spark.sql.catalyst.expressions.GreaterThan.nullSafeEval(predicates.scala:797)
 at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:496)
 at 
org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.isGreaterThan(windowExpressions.scala:245)
 at
 
org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.checkInputDataTypes(windowExpressions.scala:216)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:171)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:171)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at
 
scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
 at 
scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43) 
at scala.collection.mutable.ArrayBuffer.forall(ArrayBuffer.scala:48) at
 
org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:183)
 at
 
org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved$lzycompute(windowExpressions.scala:48)
 at
 
org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved(windowExpressions.scala:48)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at
 scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83)
```

We fix the issue by only perform the check on boundary expressions that are 
AtomicType.

## How was this patch tested?

Add new test case in `DataFrameWindowFramesSuite`

Closes #22853 from jiangxb1987/windowBoundary.

Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com>
Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7ab7f23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7ab7f23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7ab7f23

Branch: refs/heads/master
Commit: a7ab7f2348cfcd665f7815f5a9ae4d9a48383b5d
Parents: ff4bb83
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Sun Oct 28 18:15:47 2018 +0800
Committer: Xingbo Jiang <xingbo.ji...@databricks.com>
Committed: Sun Oct 28 18:15:47 2018 +0800

----------------------------------------------------------------------
 .../catalyst/expressions/windowExpressions.scala |  8 ++++++--
 .../spark/sql/DataFrameWindowFramesSuite.scala   | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a7ab7f23/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 707f312..7de6ddd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -242,8 +242,12 @@ case class SpecifiedWindowFrame(
     case e: Expression => e.sql + " FOLLOWING"
   }
 
-  private def isGreaterThan(l: Expression, r: Expression): Boolean = {
-    GreaterThan(l, r).eval().asInstanceOf[Boolean]
+  // Check whether the left boundary value is greater than the right boundary 
value. It's required
+  // that the both expressions have the same data type.
+  // Since CalendarIntervalType is not comparable, we only compare expressions 
that are AtomicType.
+  private def isGreaterThan(l: Expression, r: Expression): Boolean = 
l.dataType match {
+    case _: AtomicType => GreaterThan(l, r).eval().asInstanceOf[Boolean]
+    case _ => false
   }
 
   private def checkBoundary(b: Expression, location: String): TypeCheckResult 
= b match {

http://git-wip-us.apache.org/repos/asf/spark/blob/a7ab7f23/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
index 2a0b2b8..9c28074 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
@@ -267,6 +267,25 @@ class DataFrameWindowFramesSuite extends QueryTest with 
SharedSQLContext {
     )
   }
 
+  test("range between should accept interval values as both boundaries") {
+    def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
+
+    val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), 
(ts(1609372800), "1"),
+      (ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
+      .toDF("key", "value")
+    val window = Window.partitionBy($"value").orderBy($"key")
+      .rangeBetween(lit(CalendarInterval.fromString("interval 3 hours")),
+        lit(CalendarInterval.fromString("interval 23 days 4 hours")))
+
+    checkAnswer(
+      df.select(
+        $"key",
+        count("key").over(window)),
+      Seq(Row(ts(1501545600), 1), Row(ts(1501545600), 1), Row(ts(1609372800), 
0),
+        Row(ts(1503000000), 0), Row(ts(1502000000), 0), Row(ts(1609372800), 0))
+    )
+  }
+
   test("unbounded rows/range between with aggregation") {
     val df = Seq(("one", 1), ("two", 2), ("one", 3), ("two", 4)).toDF("key", 
"value")
     val window = Window.partitionBy($"key").orderBy($"value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to