This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 59afddb [FLINK-11844][table-api] Simplify over window API classes and improve documentation 59afddb is described below commit 59afddbcd3b3737d30c48943f9fbf8445bad23e2 Author: Timo Walther <twal...@apache.org> AuthorDate: Wed Mar 6 21:45:08 2019 +0100 [FLINK-11844][table-api] Simplify over window API classes and improve documentation This closes #7920. --- .../org/apache/flink/table/api/java/windows.scala | 81 +++-------- .../org/apache/flink/table/api/scala/windows.scala | 73 +++------- .../scala/org/apache/flink/table/api/windows.scala | 160 ++++++++++++++++----- .../flink/table/plan/ProjectionTranslator.scala | 28 ++-- 4 files changed, 179 insertions(+), 163 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala index 121aab8..5f7f422 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -18,9 +18,8 @@ package org.apache.flink.table.api.java -import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE} -import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.ExpressionParser /** * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -86,80 +85,36 @@ object Session { } /** - * Helper object for creating a over window. + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. */ object Over { /** - * Specifies the time attribute on which rows are grouped. + * Specifies the time attribute on which rows are ordered. * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. * * For batch tables, refer to a timestamp or long attribute. - */ - def orderBy(orderBy: String): OverWindowWithOrderBy = { - val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowWithOrderBy(Array[Expression](), orderByExpr) - } - - /** - * Partitions the elements on some partition keys. - * - * @param partitionBy some partition keys. - * @return A partitionedOver instance that only contains the orderBy method. - */ - def partitionBy(partitionBy: String): PartitionedOver = { - val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray - new PartitionedOver(partitionByExpr) - } -} - -class PartitionedOver(private val partitionByExpr: Array[Expression]) { - - /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. * - * For batch tables, refer to a timestamp or long attribute. + * @param orderBy field reference + * @return an over window with defined order */ - def orderBy(orderBy: String): OverWindowWithOrderBy = { - val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowWithOrderBy(partitionByExpr, orderByExpr) + def orderBy(orderBy: String): OverWindowPartitionedOrdered = { + new OverWindowPartitionedOrdered(Seq(), ExpressionParser.parseExpression(orderBy)) } -} - - -class OverWindowWithOrderBy( - private val partitionByExpr: Array[Expression], - private val orderByExpr: Expression) { /** - * Set the preceding offset (based on time or row-count intervals) for over window. - * - * @param preceding preceding offset relative to the current row. - * @return this over window - */ - def preceding(preceding: String): OverWindowWithPreceding = { - val precedingExpr = ExpressionParser.parseExpression(preceding) - new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr) - } - - /** - * Assigns an alias for this window that the following `select()` clause can refer to. + * Partitions the elements on some partition keys. * - * @param alias alias for this over window - * @return over window - */ - def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) - - /** - * Assigns an alias for this window that the following `select()` clause can refer to. + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. * - * @param alias alias for this over window - * @return over window + * @param partitionBy list of field references + * @return an over window with defined partitioning */ - def as(alias: Expression): OverWindow = { - OverWindow(alias, partitionByExpr, orderByExpr, UNBOUNDED_RANGE, CURRENT_RANGE) + def partitionBy(partitionBy: String): OverWindowPartitioned = { + new OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy)) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 2f88248..5af1d09 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.api.scala -import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} -import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.api._ +import org.apache.flink.table.expressions.Expression /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -85,73 +85,36 @@ object Session { } /** - * Helper object for creating a over window. + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. */ object Over { /** - * Specifies the time attribute on which rows are grouped. + * Specifies the time attribute on which rows are ordered. * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. * * For batch tables, refer to a timestamp or long attribute. - */ - def orderBy(orderBy: Expression): OverWindowWithOrderBy = { - new OverWindowWithOrderBy(Seq[Expression](), orderBy) - } - - /** - * Partitions the elements on some partition keys. * - * @param partitionBy some partition keys. - * @return A partitionedOver instance that only contains the orderBy method. + * @param orderBy field reference + * @return an over window with defined order */ - def partitionBy(partitionBy: Expression*): PartitionedOver = { - PartitionedOver(partitionBy.toArray) + def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = { + new OverWindowPartitionedOrdered(Seq(), orderBy) } -} - -case class PartitionedOver(partitionBy: Array[Expression]) { /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. - * - * For batch tables, refer to a timestamp or long attribute. - */ - def orderBy(orderBy: Expression): OverWindowWithOrderBy = { - OverWindowWithOrderBy(partitionBy, orderBy) - } -} - -case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) { - - /** - * Set the preceding offset (based on time or row-count intervals) for over window. - * - * @param preceding preceding offset relative to the current row. - * @return this over window - */ - def preceding(preceding: Expression): OverWindowWithPreceding = { - new OverWindowWithPreceding(partitionBy, orderBy, preceding) - } - - /** - * Assigns an alias for this window that the following `select()` clause can refer to. + * Partitions the elements on some partition keys. * - * @param alias alias for this over window - * @return over window - */ - def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) - - /** - * Assigns an alias for this window that the following `select()` clause can refer to. + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. * - * @param alias alias for this over window - * @return over window + * @param partitionBy list of field references + * @return an over window with defined partitioning */ - def as(alias: Expression): OverWindow = { - OverWindow(alias, partitionBy, orderBy, UNBOUNDED_RANGE, CURRENT_RANGE) + def partitionBy(partitionBy: Expression*): OverWindowPartitioned = { + new OverWindowPartitioned(partitionBy) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala index 960619e..5dd6a4c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala @@ -20,34 +20,130 @@ package org.apache.flink.table.api import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} -import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW} /** - * Over window is similar to the traditional OVER SQL. + * An over window specification. + * + * Similar to SQL, over window aggregates compute an aggregate for each input row over a range + * of its neighboring rows. + */ +class OverWindow( + alias: Expression, + partitionBy: Seq[Expression], + orderBy: Expression, + preceding: Expression, + following: Option[Expression]) { + + def getAlias: Expression = alias + + def getPartitioning: Seq[Expression] = partitionBy + + def getOrder: Expression = orderBy + + def getPreceding: Expression = preceding + + def getFollowing: Option[Expression] = following +} + +// ------------------------------------------------------------------------------------------------ +// Over windows +// ------------------------------------------------------------------------------------------------ + +/** + * Partially defined over window with partitioning. */ -case class OverWindow( - private[flink] val alias: Expression, - private[flink] val partitionBy: Seq[Expression], - private[flink] val orderBy: Expression, - private[flink] val preceding: Expression, - private[flink] val following: Expression) +class OverWindowPartitioned(partitionBy: Seq[Expression]) { + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: String): OverWindowPartitionedOrdered = { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = { + new OverWindowPartitionedOrdered(partitionBy, orderBy) + } +} /** - * A partially defined over window. + * Partially defined over window with (optional) partitioning and order. */ -class OverWindowWithPreceding( +class OverWindowPartitionedOrdered(partitionBy: Seq[Expression], orderBy: Expression) { + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return an over window with defined preceding + */ + def preceding(preceding: String): OverWindowPartitionedOrderedPreceding = { + this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window. + * + * @param preceding preceding offset relative to the current row. + * @return an over window with defined preceding + */ + def preceding(preceding: Expression): OverWindowPartitionedOrderedPreceding = { + new OverWindowPartitionedOrderedPreceding(partitionBy, orderBy, preceding) + } + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return the fully defined over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return the fully defined over window + */ + def as(alias: Expression): OverWindow = { + new OverWindow(alias, partitionBy, orderBy, UnboundedRange(), None) + } +} + +/** + * Partially defined over window with (optional) partitioning, order, and preceding. + */ +class OverWindowPartitionedOrderedPreceding( private val partitionBy: Seq[Expression], private val orderBy: Expression, private val preceding: Expression) { - private[flink] var following: Expression = _ + private var optionalFollowing: Option[Expression] = None /** * Assigns an alias for this window that the following `select()` clause can refer to. * * @param alias alias for this over window - * @return over window + * @return the fully defined over window */ def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) @@ -55,28 +151,19 @@ class OverWindowWithPreceding( * Assigns an alias for this window that the following `select()` clause can refer to. * * @param alias alias for this over window - * @return over window + * @return the fully defined over window */ def as(alias: Expression): OverWindow = { - - // set following to CURRENT_ROW / CURRENT_RANGE if not defined - if (null == following) { - if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) { - following = CURRENT_ROW - } else { - following = CURRENT_RANGE - } - } - OverWindow(alias, partitionBy, orderBy, preceding, following) + new OverWindow(alias, partitionBy, orderBy, preceding, optionalFollowing) } /** * Set the following offset (based on time or row-count intervals) for over window. * * @param following following offset that relative to the current row. - * @return this over window + * @return an over window with defined following */ - def following(following: String): OverWindowWithPreceding = { + def following(following: String): OverWindowPartitionedOrderedPreceding = { this.following(ExpressionParser.parseExpression(following)) } @@ -84,26 +171,29 @@ class OverWindowWithPreceding( * Set the following offset (based on time or row-count intervals) for over window. * * @param following following offset that relative to the current row. - * @return this over window + * @return an over window with defined following */ - def following(following: Expression): OverWindowWithPreceding = { - this.following = following + def following(following: Expression): OverWindowPartitionedOrderedPreceding = { + optionalFollowing = Some(following) this } } +// ------------------------------------------------------------------------------------------------ +// Group windows +// ------------------------------------------------------------------------------------------------ + /** - * A window specification. + * A group window specification. * - * Window groups rows based on time or row-count intervals. It is a general way to group the - * elements, which is very helpful for both groupBy-aggregations and over-aggregations to - * compute aggregates on groups of elements. + * Group windows group rows based on time or row-count intervals and is therefore essentially a + * special type of groupBy. Just like groupBy, group windows allow to compute aggregates + * on groups of elements. * * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping * is required to apply aggregations on streaming tables. * - * For finite batch tables, window provides shortcuts for time-based groupBy. - * + * For finite batch tables, group windows provide shortcuts for time-based groupBy. */ abstract class Window(val alias: Expression, val timeField: Expression) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index 82b066c..7b3088c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{OverWindow, TableEnvironment, ValidationException} import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical.{LogicalNode, Project} +import org.apache.flink.table.typeutils.RowIntervalTypeInfo import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -254,16 +255,23 @@ object ProjectionTranslator { expr match { case u: UnresolvedOverCall => - val overWindow = overWindows.find(_.alias.equals(u.alias)) - if (overWindow.isDefined) { - OverCall( - u.agg, - overWindow.get.partitionBy, - overWindow.get.orderBy, - overWindow.get.preceding, - overWindow.get.following) - } else { - u + overWindows.find(_.getAlias.equals(u.alias)) match { + case Some(overWindow) => + OverCall( + u.agg, + overWindow.getPartitioning, + overWindow.getOrder, + overWindow.getPreceding, + overWindow.getFollowing.getOrElse { + // set following to CURRENT_ROW / CURRENT_RANGE if not defined + if (overWindow.getPreceding.resultType.isInstanceOf[RowIntervalTypeInfo]) { + CurrentRow() + } else { + CurrentRange() + } + }) + + case None => u } case u: UnaryExpression =>