[FLINK-6228] [table] Add support for OVER windows to streaming Table API. This closes #3743.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe018921 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe018921 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe018921 Branch: refs/heads/master Commit: fe018921ed0b24f94ab2139f04293d6074ce4fba Parents: 1664447 Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Thu Apr 13 17:36:18 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Apr 24 18:39:30 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/java/groupWindows.scala | 84 --- .../apache/flink/table/api/java/windows.scala | 129 ++++ .../flink/table/api/scala/expressionDsl.scala | 27 +- .../flink/table/api/scala/groupWindows.scala | 85 --- .../apache/flink/table/api/scala/windows.scala | 126 ++++ .../org/apache/flink/table/api/table.scala | 62 ++ .../org/apache/flink/table/api/windows.scala | 118 +++- .../table/expressions/ExpressionParser.scala | 64 +- .../flink/table/expressions/aggregations.scala | 36 +- .../apache/flink/table/expressions/call.scala | 211 ++++++- .../flink/table/plan/ProjectionTranslator.scala | 51 +- .../scala/stream/table/OverWindowITCase.scala | 331 ++++++++++ .../api/scala/stream/table/OverWindowTest.scala | 596 +++++++++++++++++++ .../OverWindowStringExpressionTest.scala | 151 +++++ 14 files changed, 1880 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala deleted file mode 100644 index 9c82e9b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.java - -import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow} - -/** - * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping - * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping - * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a tumbling window - */ - def over(size: String): TumblingWindow = new TumblingWindow(size) -} - -/** - * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements - * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive - * window evaluations. - */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive window evaluations. - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: String): SlideWithSize = new SlideWithSize(size) -} - -/** - * Helper class for creating a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a session window - */ - def withGap(gap: String): SessionWindow = new SessionWindow(gap) -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala new file mode 100644 index 0000000..1e9d65a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java + +import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, SlideWithSize, TumblingWindow} +import org.apache.flink.table.expressions.{Expression, ExpressionParser} + +/** + * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + */ +object Tumble { + + /** + * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a tumbling window + */ + def over(size: String): TumblingWindow = new TumblingWindow(size) +} + +/** + * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements + * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive + * window evaluations. + */ +object Slide { + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive window evaluations. + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: String): SlideWithSize = new SlideWithSize(size) +} + +/** + * Helper class for creating a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + */ +object Session { + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a session window + */ + def withGap(gap: String): SessionWindow = new SessionWindow(gap) +} + +/** + * Helper object for creating a over window. + */ +object Over { + + /** + * Specifies the time attribute on which rows are grouped. + * + * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * + * For batch tables, refer to a timestamp or long attribute. + */ + def orderBy(orderBy: String): OverWindowPredefined = { + val orderByExpr = ExpressionParser.parseExpression(orderBy) + new OverWindowPredefined(Seq[Expression](), orderByExpr) + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy some partition keys. + * @return A partitionedOver instance that only contains the orderBy method. + */ + def partitionBy(partitionBy: String): PartitionedOver = { + val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray + new PartitionedOver(partitionByExpr) + } +} + +class PartitionedOver(private val partitionByExpr: Array[Expression]) { + + /** + * Specifies the time attribute on which rows are grouped. + * + * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * + * For batch tables, refer to a timestamp or long attribute. + */ + def orderBy(orderBy: String): OverWindowPredefined = { + val orderByExpr = ExpressionParser.parseExpression(orderBy) + new OverWindowPredefined(partitionByExpr, orderByExpr) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 06d46e3..fc6cb10 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -17,14 +17,15 @@ */ package org.apache.flink.table.api.scala +import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Time, Timestamp} import org.apache.calcite.avatica.util.DateTimeUtils._ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange} import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval} import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit import org.apache.flink.table.expressions._ -import java.math.{BigDecimal => JBigDecimal} import scala.language.implicitConversions @@ -364,6 +365,23 @@ trait ImplicitExpressionOperations { def position(haystack: Expression) = Position(expr, haystack) /** + * For windowing function to config over window + * e.g.: + * table + * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) + */ + def over(alias: Expression) = { + expr match { + case _: Aggregation => UnresolvedOverCall( + expr.asInstanceOf[Aggregation], + alias) + case _ => throw new TableException( + "The over method can only using with aggregation expression.") + } + } + + /** * Replaces a substring of string with a string starting at a position (starting at 1). * * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx" @@ -586,6 +604,13 @@ trait ImplicitExpressionOperations { * to [[ImplicitExpressionOperations]]. */ trait ImplicitExpressionConversions { + + implicit val UNBOUNDED_ROW = UnboundedRow() + implicit val UNBOUNDED_RANGE = UnboundedRange() + + implicit val CURRENT_ROW = CurrentRow() + implicit val CURRENT_RANGE = CurrentRange() + implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { def expr = e } http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala deleted file mode 100644 index 0e4c1c7..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.scala - -import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow} - -/** - * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping - * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping - * windows. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a tumbling window - */ - def over(size: Expression): TumblingWindow = new TumblingWindow(size) -} - -/** - * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements - * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive - * window evaluations. - */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: Expression): SlideWithSize = new SlideWithSize(size) -} - -/** - * Helper object for creating a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a session window - */ - def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala new file mode 100644 index 0000000..1230027 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala + +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, SlideWithSize, TumblingWindow} + +/** + * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + */ +object Tumble { + + /** + * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping + * windows. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a tumbling window + */ + def over(size: Expression): TumblingWindow = new TumblingWindow(size) +} + +/** + * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements + * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive + * window evaluations. + */ +object Slide { + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: Expression): SlideWithSize = new SlideWithSize(size) +} + +/** + * Helper object for creating a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + */ +object Session { + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a session window + */ + def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) +} + +/** + * Helper object for creating a over window. + */ +object Over { + + /** + * Specifies the time attribute on which rows are grouped. + * + * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * + * For batch tables, refer to a timestamp or long attribute. + */ + def orderBy(orderBy: Expression): OverWindowPredefined = { + new OverWindowPredefined(Seq[Expression](), orderBy) + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy some partition keys. + * @return A partitionedOver instance that only contains the orderBy method. + */ + def partitionBy(partitionBy: Expression*): PartitionedOver = { + PartitionedOver(partitionBy.toArray) + } +} + +case class PartitionedOver(partitionBy: Array[Expression]) { + + /** + * Specifies the time attribute on which rows are grouped. + * + * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * + * For batch tables, refer to a timestamp or long attribute. + */ + def orderBy(orderBy: Expression): OverWindowPredefined = { + new OverWindowPredefined(partitionBy, orderBy) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 769e940..c688aab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.sinks.TableSink import _root_.scala.collection.JavaConverters._ +import _root_.scala.annotation.varargs /** * A Table is the core component of the Table API. @@ -810,6 +811,44 @@ class Table( new WindowedTable(this, window) } + /** + * Defines over-windows on the records of a table. + * + * An over-window defines for each record an interval of records over which aggregation + * functions can be computed. + * + * Example: + * + * {{{ + * table + * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) + * .select('c, 'b.count over 'ow, 'e.sum over 'ow) + * }}} + * + * __Note__: Computing over window aggregates on a streaming table is only a parallel operation + * if the window is partitioned. Otherwise, the whole stream will be processed by a single + * task, i.e., with parallelism 1. + * + * __Note__: Over-windows for batch tables are currently not supported. + * + * @param overWindows windows that specify the record interval over which aggregations are + * computed. + * @return An OverWindowedTable to specify the aggregations. + */ + @varargs + def window(overWindows: OverWindow*): OverWindowedTable = { + + if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over-windows for batch tables are currently not supported..") + } + + if (overWindows.size != 1) { + throw TableException("Over-Windows are currently only supported single window.") + } + + new OverWindowedTable(this, overWindows.toArray) + } + var tableName: String = _ /** @@ -928,6 +967,29 @@ class WindowedTable( } +class OverWindowedTable( + private[flink] val table: Table, + private[flink] val overWindows: Array[OverWindow]) { + + def select(fields: Expression*): Table = { + val expandedFields = expandProjectList( + fields, + table.logicalPlan, + table.tableEnv) + + val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv) + + new Table( + table.tableEnv, + Project(expandedOverFields.map(UnresolvedAlias), table.logicalPlan).validate(table.tableEnv)) + } + + def select(fields: String): Table = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } +} + class WindowGroupedTable( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala index 5ba6934..577d77c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala @@ -18,8 +18,124 @@ package org.apache.flink.table.api -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW} + +/** + * Over window is similar to the traditional OVER SQL. + */ +case class OverWindow( + private[flink] val alias: Expression, + private[flink] val partitionBy: Seq[Expression], + private[flink] val orderBy: Expression, + private[flink] val preceding: Expression, + private[flink] val following: Expression) + +case class CurrentRow() extends Expression { + override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS + override private[flink] def children = Seq() + override def toString = "CURRENT ROW" +} + +case class CurrentRange() extends Expression { + override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS + override private[flink] def children = Seq() + override def toString = "CURRENT RANGE" +} + +case class UnboundedRow() extends Expression { + override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS + override private[flink] def children = Seq() + override def toString = "UNBOUNDED ROW" +} + +case class UnboundedRange() extends Expression { + override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS + override private[flink] def children = Seq() + override def toString = "UNBOUNDED RANGE" +} + +/** + * An over window predefined specification. + */ +class OverWindowPredefined( + private val partitionBy: Seq[Expression], + private val orderBy: Expression) { + + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = _ + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return over window + */ + def as(alias: Expression): OverWindow = { + + // set following to CURRENT_ROW / CURRENT_RANGE if not defined + if (null == following) { + if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) { + following = CURRENT_ROW + } else { + following = CURRENT_RANGE + } + } + OverWindow(alias, partitionBy, orderBy, preceding, following) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return this over window + */ + def preceding(preceding: String): OverWindowPredefined = { + this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return this over window + */ + def preceding(preceding: Expression): OverWindowPredefined = { + this.preceding = preceding + this + } + + /** + * Set the following offset (based on time or row-count intervals) for over window. + * + * @param following following offset that relative to the current row. + * @return this over window + */ + def following(following: String): OverWindowPredefined = { + this.following(ExpressionParser.parseExpression(following)) + } + + /** + * Set the following offset (based on time or row-count intervals) for over window. + * + * @param following following offset that relative to the current row. + * @return this over window + */ + def following(following: Expression): OverWindowPredefined = { + this.following = following + this + } +} /** * A window specification. http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index ed0b16e..1985248 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.expressions import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.table.api.ExpressionParserException +import org.apache.flink.table.api.{ExpressionParserException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange} import org.apache.flink.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval} import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit @@ -87,11 +87,17 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val STAR: Keyword = Keyword("*") lazy val GET: Keyword = Keyword("get") lazy val FLATTEN: Keyword = Keyword("flatten") + lazy val OVER: Keyword = Keyword("over") + lazy val CURRENT_ROW: Keyword = Keyword("current_row") + lazy val CURRENT_RANGE: Keyword = Keyword("current_range") + lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row") + lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range") def functionIdent: ExpressionParser.Parser[String] = not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ - not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ - not(IF) ~> super.ident + not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~ + not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~> + super.ident // symbols @@ -166,8 +172,25 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { dt => Null(dt) } + // OVER constants + lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ { + _ => CurrentRange() + } + lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ { + _ => CurrentRow() + } + lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ { + _ => UnboundedRange() + } + lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ { + _ => UnboundedRow() + } + lazy val overConstant: PackratParser[Expression] = + currentRange | currentRow | unboundedRange | unboundedRow + lazy val literalExpr: PackratParser[Expression] = - numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral + numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral | + overConstant lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ { sym => UnresolvedFieldReference(sym) @@ -289,12 +312,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixFlattening: PackratParser[Expression] = composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) } + lazy val suffixAgg: PackratParser[Expression] = + suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg + lazy val suffixed: PackratParser[Expression] = - suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart | - suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | - suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate | - suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil | - suffixGet | suffixFlattening | + suffixTimeInterval | suffixRowInterval | suffixStart | suffixEnd | suffixAgg | + suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixAsc | + suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | suffixExtract | + suffixFloor | suffixCeil | suffixGet | suffixFlattening | suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end // prefix operators @@ -375,15 +400,26 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixFlattening: PackratParser[Expression] = FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } + lazy val prefixAgg: PackratParser[Expression] = + prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg + lazy val prefixed: PackratParser[Expression] = - prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | - prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | - prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | prefixFlattening | - prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end + prefixArray | prefixAgg | prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | + prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | + prefixFlattening | prefixFunctionCall | + prefixFunctionCallOneArg // function call must always be at the end + + // over + + lazy val over: PackratParser[Expression] = suffixAgg ~ OVER ~ fieldReference ^^ { + case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef) + } | prefixAgg ~ OVER ~ fieldReference ^^ { + case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef) + } // suffix/prefix composite - lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom | + lazy val composite: PackratParser[Expression] = suffixed | over | prefixed | atom | failure("Composite expression expected.") // unary ops http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala index b2fca88..4ef5209 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala @@ -18,11 +18,14 @@ package org.apache.flink.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.fun._ +import org.apache.calcite.sql.SqlKind._ import org.apache.calcite.tools.RelBuilder import org.apache.calcite.tools.RelBuilder.AggCall -import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.calcite.FlinkTypeFactory abstract sealed class Aggregation extends UnaryExpression { @@ -35,6 +38,12 @@ abstract sealed class Aggregation extends UnaryExpression { * Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** + * Returns the SqlAggFunction for this Aggregation. + */ + private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction + } case class Sum(child: Expression) extends Aggregation { @@ -48,6 +57,13 @@ case class Sum(child: Expression) extends Aggregation { override private[flink] def validateInput() = TypeCheckUtils.assertNumericExpr(child.resultType, "sum") + + override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(resultType) + new SqlSumAggFunction(returnType) + } } case class Min(child: Expression) extends Aggregation { @@ -61,6 +77,10 @@ case class Min(child: Expression) extends Aggregation { override private[flink] def validateInput() = TypeCheckUtils.assertOrderableExpr(child.resultType, "min") + + override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { + new SqlMinMaxAggFunction(MIN) + } } case class Max(child: Expression) extends Aggregation { @@ -74,6 +94,10 @@ case class Max(child: Expression) extends Aggregation { override private[flink] def validateInput() = TypeCheckUtils.assertOrderableExpr(child.resultType, "max") + + override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { + new SqlMinMaxAggFunction(MAX) + } } case class Count(child: Expression) extends Aggregation { @@ -84,6 +108,10 @@ case class Count(child: Expression) extends Aggregation { } override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO + + override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { + new SqlCountAggFunction() + } } case class Avg(child: Expression) extends Aggregation { @@ -97,4 +125,8 @@ case class Avg(child: Expression) extends Aggregation { override private[flink] def validateInput() = TypeCheckUtils.assertNumericExpr(child.resultType, "avg") + + override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { + new SqlAvgAggFunction(AVG) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala index 40db13e..68ed688 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -17,15 +17,25 @@ */ package org.apache.flink.table.expressions -import org.apache.calcite.rex.RexNode +import java.util + +import com.google.common.collect.ImmutableList +import org.apache.calcite.rex.RexWindowBound._ +import org.apache.calcite.rex.{RexFieldCollation, RexNode, RexWindowBound} +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference +import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{UnresolvedException, ValidationException} +import org.apache.flink.table.api._ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ -import org.apache.flink.table.functions.{ScalarFunction, TableFunction} +import org.apache.flink.table.functions._ import org.apache.flink.table.plan.logical.{LogicalNode, LogicalTableFunctionCall} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} +import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} + +import _root_.scala.collection.JavaConverters._ /** * General expression for unresolved function calls. The function can be a built-in @@ -49,6 +59,201 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over call with unresolved alias for over window. + * + * @param agg The aggregation of the over call. + * @param alias The alias of the referenced over window. + */ +case class UnresolvedOverCall(agg: Expression, alias: Expression) extends Expression { + + override private[flink] def validateInput() = + ValidationFailure("Over window with alias $alias could not be resolved.") + + override private[flink] def resultType = agg.resultType + + override private[flink] def children = Seq() +} + +/** + * Over expression for Calcite over transform. + * + * @param agg over-agg expression + * @param partitionBy The fields by which the over window is partitioned + * @param orderBy The field by which the over window is sorted + * @param preceding The lower bound of the window + * @param following The upper bound of the window + */ +case class OverCall( + agg: Expression, + partitionBy: Seq[Expression], + orderBy: Expression, + preceding: Expression, + following: Expression) extends Expression { + + override def toString: String = s"$agg OVER (" + + s"PARTITION BY (${partitionBy.mkString(", ")}) " + + s"ORDER BY $orderBy " + + s"PRECEDING $preceding " + + s"FOLLOWING $following)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + // assemble aggregation + val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction() + val aggResultType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(agg.resultType) + + val aggChildName = agg.asInstanceOf[Aggregation].child.asInstanceOf[ResolvedFieldReference].name + val aggExprs = List(relBuilder.field(aggChildName).asInstanceOf[RexNode]).asJava + + // assemble order by key + val orderKey = orderBy match { + case _: RowTime => + new RexFieldCollation(relBuilder.call(EventTimeExtractor), Set[SqlKind]().asJava) + case _: ProcTime => + new RexFieldCollation(relBuilder.call(ProcTimeExtractor), Set[SqlKind]().asJava) + case _ => + throw new ValidationException("Invalid OrderBy expression.") + } + val orderKeys = ImmutableList.of(orderKey) + + // assemble partition by keys + val partitionKeys = partitionBy.map(_.toRexNode(relBuilder)).asJava + + // assemble bounds + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING) + + // build RexOver + rexBuilder.makeOver( + aggResultType, + operator, + aggExprs, + partitionKeys, + orderKeys, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + bound: Expression, + sqlKind: SqlKind): RexWindowBound = { + + bound match { + case _: UnboundedRow | _: UnboundedRange => + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + case _: CurrentRow | _: CurrentRange => + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + case b: Literal => + val returnType = relBuilder + .getTypeFactory.asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(Types.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(b.value)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private[flink] def children: Seq[Expression] = + Seq(agg) ++ Seq(orderBy) ++ partitionBy ++ Seq(preceding) ++ Seq(following) + + override private[flink] def resultType = agg.resultType + + override private[flink] def validateInput(): ValidationResult = { + + // check that agg expression is aggregation + agg match { + case _: Aggregation => + ValidationSuccess + case _ => + return ValidationFailure(s"OVER can only be applied on an aggregation.") + } + + // check partitionBy expression keys are resolved field reference + partitionBy.foreach { + case r: ResolvedFieldReference if r.resultType.isKeyType => + ValidationSuccess + case r: ResolvedFieldReference => + return ValidationFailure(s"Invalid PartitionBy expression: $r. " + + s"Expression must return key type.") + case r => + return ValidationFailure(s"Invalid PartitionBy expression: $r. " + + s"Expression must be a resolved field reference.") + } + + // check preceding is valid + preceding match { + case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange => + ValidationSuccess + case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 => + ValidationSuccess + case Literal(_, _: RowIntervalTypeInfo) => + return ValidationFailure("Preceding row interval must be larger than 0.") + case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 => + ValidationSuccess + case Literal(_, _: TimeIntervalTypeInfo[_]) => + return ValidationFailure("Preceding time interval must be equal or larger than 0.") + case Literal(_, _) => + return ValidationFailure("Preceding must be a row interval or time interval literal.") + } + + // check following is valid + following match { + case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange => + ValidationSuccess + case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 => + ValidationSuccess + case Literal(_, _: RowIntervalTypeInfo) => + return ValidationFailure("Following row interval must be larger than 0.") + case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 => + ValidationSuccess + case Literal(_, _: TimeIntervalTypeInfo[_]) => + return ValidationFailure("Following time interval must be equal or larger than 0.") + case Literal(_, _) => + return ValidationFailure("Following must be a row interval or time interval literal.") + } + + // check that preceding and following are of same type + (preceding, following) match { + case (p: Expression, f: Expression) if p.resultType == f.resultType => + ValidationSuccess + case _ => + return ValidationFailure("Preceding and following must be of same interval type.") + } + + ValidationSuccess + } +} + +/** * Expression for calling a user-defined scalar functions. * * @param scalarFunction scalar function to be called (might be overloaded) http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index 94a0aa1..0d45a37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -19,8 +19,9 @@ package org.apache.flink.table.plan import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{OverWindow, StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.{ProcTime, RowTime} import org.apache.flink.table.plan.logical.{LogicalNode, Project} import scala.collection.mutable @@ -221,6 +222,54 @@ object ProjectionTranslator { projectList } + def resolveOverWindows( + exprs: Seq[Expression], + overWindows: Array[OverWindow], + tEnv: TableEnvironment): Seq[Expression] = { + + def resolveOverWindow(unresolvedCall: UnresolvedOverCall): Expression = { + + val overWindow = overWindows.find(_.alias.equals(unresolvedCall.alias)) + if (overWindow.isDefined) { + if (tEnv.isInstanceOf[StreamTableEnvironment]) { + val timeIndicator = overWindow.get.orderBy match { + case u: UnresolvedFieldReference if u.name.toLowerCase == "rowtime" => + RowTime() + case u: UnresolvedFieldReference if u.name.toLowerCase == "proctime" => + ProcTime() + case e: Expression => e + } + OverCall( + unresolvedCall.agg, + overWindow.get.partitionBy, + timeIndicator, + overWindow.get.preceding, + overWindow.get.following) + } else { + OverCall( + unresolvedCall.agg, + overWindow.get.partitionBy, + overWindow.get.orderBy, + overWindow.get.preceding, + overWindow.get.following) + } + } else { + unresolvedCall + } + } + + val projectList = new ListBuffer[Expression] + exprs.foreach { + case Alias(u: UnresolvedOverCall, name, _) => + projectList += Alias(resolveOverWindow(u), name) + case u: UnresolvedOverCall => + projectList += resolveOverWindow(u) + case e: Expression => projectList += e + } + projectList + } + + /** * Extract all field references from the given expressions. * http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala new file mode 100644 index 0000000..8464ba1 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.RowTimeSourceFunction +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear + env.setParallelism(1) + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select( + 'a, 'b, 'c, + 'b.sum over 'w, + 'b.count over 'w, + 'b.avg over 'w, + 'b.max over 'w, + 'b.min over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hello,6,3,2,3,1", + "1,2,Hello,6,3,2,3,1", + "1,3,Hello world,6,3,2,3,1", + "1,1,Hi,7,4,1,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,6,3,2,3,1", + "2,3,Hello world,6,3,2,3,1", + "1,4,Hello world,11,5,2,4,1", + "1,5,Hello world,29,8,3,7,1", + "1,6,Hello world,29,8,3,7,1", + "1,7,Hello world,29,8,3,7,1", + "2,4,Hello world,15,5,3,5,1", + "2,5,Hello world,15,5,3,5,1" + ) + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver(): Unit = { + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L), + (4, 7L, 6, "CDE", 2L), + (4, 8L, 7, "DEF", 1L), + (4, 9L, 8, "EFG", 1L), + (4, 10L, 9, "FGH", 2L), + (5, 11L, 10, "GHI", 1L), + (5, 12L, 11, "HIJ", 3L), + (5, 13L, 12, "IJK", 3L), + (5, 14L, 13, "JKL", 2L), + (5, 15L, 14, "KLM", 2L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + val windowedTable = table + .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) + .select('a, 'c.sum over 'w, 'c.min over 'w) + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRowOver(): Unit = { + val data = Seq( + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((3L, (7L, 7, "Hello World"))), + Left((1L, (7L, 7, "Hello World"))), + Left((1L, (7L, 7, "Hello World"))), + Right(2L), + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), + Right(6L), + Left((8L, (8L, 8, "Hello World"))), + Left((7L, (7L, 7, "Hello World"))), + Right(20L)) + + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val table = env.addSource[(Long, Int, String)]( + new RowTimeSourceFunction[(Long, Int, String)](data)).toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3", + "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6", + "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12", + "Hello,6,3,15", + "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21", + "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeBoundedPartitionedRangeOver(): Unit = { + val data = Seq( + Left((1500L, (1L, 15, "Hello"))), + Left((1600L, (1L, 16, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 3, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 6, "Hello"))), + Left((6500L, (6L, 65, "Hello"))), + Right(7000L), + Left((9000L, (6L, 9, "Hello"))), + Left((9500L, (6L, 18, "Hello"))), + Left((9000L, (6L, 9, "Hello"))), + Right(10000L), + Left((10000L, (7L, 7, "Hello World"))), + Left((11000L, (7L, 17, "Hello World"))), + Left((11000L, (7L, 77, "Hello World"))), + Right(12000L), + Left((14000L, (7L, 18, "Hello World"))), + Right(14000L), + Left((15000L, (8L, 8, "Hello World"))), + Right(17000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val table = env.addSource[(Long, Int, String)]( + new RowTimeSourceFunction[(Long, Int, String)](data)).toTable(tEnv).as('a, 'b, 'c) + + val windowedTable = table + .window( + Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w) + .select('c, 'b, 'a.count over 'w, 'a.sum over 'w) + + val result = windowedTable.toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", + "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9", + "Hello,3,4,9", + "Hello,4,2,7", + "Hello,5,2,9", + "Hello,6,2,11", "Hello,65,2,12", + "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18", + "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7", + "Hello World,8,2,15", + "Hello World,20,1,20") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } +} + +object OverWindowITCase { + + class RowTimeSourceFunction[T]( + dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] { + override def run(ctx: SourceContext[T]): Unit = { + dataWithTimestampList.foreach { + case Left(t) => ctx.collectWithTimestamp(t._2, t._1) + case Right(w) => ctx.emitWatermark(new Watermark(w)) + } + } + + override def cancel(): Unit = ??? + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala new file mode 100644 index 0000000..26f4ead --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.junit.Test + +class OverWindowTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowAlias(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) + .select('c, 'b.count over 'x) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testOrderBy(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testPrecedingAndFollowingUsingIsLiteral(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testPrecedingAndFollowingUsingSameType(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testPartitionByWithUnresolved(): Unit = { + val result = table + .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testPartitionByWithNotKeyType(): Unit = { + val table2 = streamUtil.addTable[(Int, String, Either[Long, String])]("MyTable2", 'a, 'b, 'c) + + val result = table2 + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testPrecedingValue(): Unit = { + val result = table + .window(Over orderBy 'rowtime preceding -1.rows as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test(expected = classOf[ValidationException]) + def testFollowingValue(): Unit = { + val result = table + .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w) + .select('c, 'b.count over 'w) + streamUtil.tEnv.optimize(result.getRelNode) + } + + @Test + def testProcTimeBoundedPartitionedRowsOver() = { + val result = table + .window(Over partitionBy 'c orderBy 'proctime preceding 2.rows following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "b", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "b", "c", "PROCTIME", "COUNT(b) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeBoundedPartitionedRangeOver() = { + val result = table + .window( + Over partitionBy 'a orderBy 'proctime preceding 2.hours following CURRENT_RANGE as 'w) + .select('a, 'c.avg over 'w as 'myAvg) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "a"), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "PROCTIME", + "AVG(c) AS w0$o0" + ) + ), + term("select", "a", "w0$o0 AS myAvg") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeBoundedNonPartitionedRangeOver() = { + val result = table + .window(Over orderBy 'proctime preceding 10.second as 'w) + .select('a, 'c.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0") + ), + term("select", "a", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeBoundedNonPartitionedRowsOver() = { + val result = table + .window(Over orderBy 'proctime preceding 2.rows as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeUnboundedPartitionedRangeOver() = { + val result = table + .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select('a, 'c, 'a.count over 'w, 'a.sum over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "PROCTIME", + "COUNT(a) AS w0$o0", + "SUM(a) AS w0$o1" + ) + ), + term( + "select", + "a", + "c", + "w0$o0 AS _c2", + "w0$o1 AS _c3" + ) + ) + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeUnboundedPartitionedRowsOver() = { + val result = table + .window( + Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeUnboundedNonPartitionedRangeOver() = { + val result = table + .window( + Over orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) + .select('a, 'c, 'a.count over 'w, 'a.sum over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "PROCTIME", + "COUNT(a) AS w0$o0", + "SUM(a) AS w0$o1" + ) + ), + term( + "select", + "a", + "c", + "w0$o0 AS _c2", + "w0$o1 AS _c3" + ) + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testProcTimeUnboundedNonPartitionedRowsOver() = { + val result = table + .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeBoundedPartitionedRowsOver() = { + val result = table + .window( + Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "b", "c", "ROWTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "ROWTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "b", "c", "ROWTIME", "COUNT(b) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeBoundedPartitionedRangeOver() = { + val result = table + .window( + Over partitionBy 'a orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w) + .select('a, 'c.avg over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("partitionBy", "a"), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "ROWTIME", + "AVG(c) AS w0$o0" + ) + ), + term("select", "a", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeBoundedNonPartitionedRangeOver() = { + val result = table + .window(Over orderBy 'rowtime preceding 10.second as 'w) + .select('a, 'c.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(c) AS w0$o0") + ), + term("select", "a", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeBoundedNonPartitionedRowsOver() = { + val result = table + .window(Over orderBy 'rowtime preceding 2.rows as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("orderBy", "ROWTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeUnboundedPartitionedRangeOver() = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE following + CURRENT_RANGE as 'w) + .select('a, 'c, 'a.count over 'w, 'a.sum over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "ROWTIME", + "COUNT(a) AS w0$o0", + "SUM(a) AS w0$o1" + ) + ), + term( + "select", + "a", + "c", + "w0$o0 AS _c2", + "w0$o1 AS _c3" + ) + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeUnboundedPartitionedRowsOver() = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following + CURRENT_ROW as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "ROWTIME"), + term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeUnboundedNonPartitionedRangeOver() = { + val result = table + .window( + Over orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) + .select('a, 'c, 'a.count over 'w, 'a.sum over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("orderBy", "ROWTIME"), + term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term( + "select", + "a", + "c", + "ROWTIME", + "COUNT(a) AS w0$o0", + "SUM(a) AS w0$o1" + ) + ), + term( + "select", + "a", + "c", + "w0$o0 AS _c2", + "w0$o1 AS _c3" + ) + ) + + streamUtil.verifyTable(result, expected) + } + + @Test + def testRowTimeUnboundedNonPartitionedRowsOver() = { + val result = table + .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) + .select('c, 'a.count over 'w) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "ROWTIME() AS $2") + ), + term("orderBy", "ROWTIME"), + term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS _c1") + ) + + streamUtil.verifyTable(result, expected) + } + +} + +object OverWindowTest{ + case class Pojo(id: Long, name: String) +}