Repository: spark Updated Branches: refs/heads/master ff4bb836a -> a7ab7f234
[SPARK-25845][SQL] Fix MatchError for calendar interval type in range frame left boundary ## What changes were proposed in this pull request? WindowSpecDefinition checks start < last, but CalendarIntervalType is not comparable, so it would throw the following exception at runtime: ``` scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) at org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:58) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering$lzycompute(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.GreaterThan.nullSafeEval(predicates.scala:797) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:496) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.isGreaterThan(windowExpressions.scala:245) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.checkInputDataTypes(windowExpressions.scala:216) at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38) at scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43) at scala.collection.mutable.ArrayBuffer.forall(ArrayBuffer.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved$lzycompute(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83) ``` We fix the issue by only perform the check on boundary expressions that are AtomicType. ## How was this patch tested? Add new test case in `DataFrameWindowFramesSuite` Closes #22853 from jiangxb1987/windowBoundary. Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7ab7f23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7ab7f23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7ab7f23 Branch: refs/heads/master Commit: a7ab7f2348cfcd665f7815f5a9ae4d9a48383b5d Parents: ff4bb83 Author: Xingbo Jiang <xingbo.ji...@databricks.com> Authored: Sun Oct 28 18:15:47 2018 +0800 Committer: Xingbo Jiang <xingbo.ji...@databricks.com> Committed: Sun Oct 28 18:15:47 2018 +0800 ---------------------------------------------------------------------- .../catalyst/expressions/windowExpressions.scala | 8 ++++++-- .../spark/sql/DataFrameWindowFramesSuite.scala | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a7ab7f23/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 707f312..7de6ddd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -242,8 +242,12 @@ case class SpecifiedWindowFrame( case e: Expression => e.sql + " FOLLOWING" } - private def isGreaterThan(l: Expression, r: Expression): Boolean = { - GreaterThan(l, r).eval().asInstanceOf[Boolean] + // Check whether the left boundary value is greater than the right boundary value. It's required + // that the both expressions have the same data type. + // Since CalendarIntervalType is not comparable, we only compare expressions that are AtomicType. + private def isGreaterThan(l: Expression, r: Expression): Boolean = l.dataType match { + case _: AtomicType => GreaterThan(l, r).eval().asInstanceOf[Boolean] + case _ => false } private def checkBoundary(b: Expression, location: String): TypeCheckResult = b match { http://git-wip-us.apache.org/repos/asf/spark/blob/a7ab7f23/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala index 2a0b2b8..9c28074 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala @@ -267,6 +267,25 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSQLContext { ) } + test("range between should accept interval values as both boundaries") { + def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000) + + val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"), + (ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2")) + .toDF("key", "value") + val window = Window.partitionBy($"value").orderBy($"key") + .rangeBetween(lit(CalendarInterval.fromString("interval 3 hours")), + lit(CalendarInterval.fromString("interval 23 days 4 hours"))) + + checkAnswer( + df.select( + $"key", + count("key").over(window)), + Seq(Row(ts(1501545600), 1), Row(ts(1501545600), 1), Row(ts(1609372800), 0), + Row(ts(1503000000), 0), Row(ts(1502000000), 0), Row(ts(1609372800), 0)) + ) + } + test("unbounded rows/range between with aggregation") { val df = Seq(("one", 1), ("two", 2), ("one", 3), ("two", 4)).toDF("key", "value") val window = Window.partitionBy($"key").orderBy($"value") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org