This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 59afddb  [FLINK-11844][table-api] Simplify over window API classes and 
improve documentation
59afddb is described below

commit 59afddbcd3b3737d30c48943f9fbf8445bad23e2
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Mar 6 21:45:08 2019 +0100

    [FLINK-11844][table-api] Simplify over window API classes and improve 
documentation
    
    This closes #7920.
---
 .../org/apache/flink/table/api/java/windows.scala  |  81 +++--------
 .../org/apache/flink/table/api/scala/windows.scala |  73 +++-------
 .../scala/org/apache/flink/table/api/windows.scala | 160 ++++++++++++++++-----
 .../flink/table/plan/ProjectionTranslator.scala    |  28 ++--
 4 files changed, 179 insertions(+), 163 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
index 121aab8..5f7f422 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -18,9 +18,8 @@
 
 package org.apache.flink.table.api.java
 
-import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE}
-import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.ExpressionParser
 
 /**
   * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
@@ -86,80 +85,36 @@ object Session {
 }
 
 /**
-  * Helper object for creating a over window.
+  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+  * aggregate for each input row over a range of its neighboring rows.
   */
 object Over {
 
   /**
-    * Specifies the time attribute on which rows are grouped.
+    * Specifies the time attribute on which rows are ordered.
     *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
     *
     * For batch tables, refer to a timestamp or long attribute.
-    */
-  def orderBy(orderBy: String): OverWindowWithOrderBy = {
-    val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
-    *
-    * @param partitionBy some partition keys.
-    * @return A partitionedOver instance that only contains the orderBy method.
-    */
-  def partitionBy(partitionBy: String): PartitionedOver = {
-    val partitionByExpr = 
ExpressionParser.parseExpressionList(partitionBy).toArray
-    new PartitionedOver(partitionByExpr)
-  }
-}
-
-class PartitionedOver(private val partitionByExpr: Array[Expression]) {
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * @param orderBy field reference
+    * @return an over window with defined order
     */
-  def orderBy(orderBy: String): OverWindowWithOrderBy = {
-    val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
+  def orderBy(orderBy: String): OverWindowPartitionedOrdered = {
+    new OverWindowPartitionedOrdered(Seq(), 
ExpressionParser.parseExpression(orderBy))
   }
-}
-
-
-class OverWindowWithOrderBy(
-  private val partitionByExpr: Array[Expression],
-  private val orderByExpr: Expression) {
 
   /**
-    * Set the preceding offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return this over window
-    */
-  def preceding(preceding: String): OverWindowWithPreceding = {
-    val precedingExpr = ExpressionParser.parseExpression(preceding)
-    new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    * Partitions the elements on some partition keys.
     *
-    * @param alias alias for this over window
-    * @return over window
-    */
-  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    * Each partition is individually sorted and aggregate functions are 
applied to each
+    * partition separately.
     *
-    * @param alias alias for this over window
-    * @return over window
+    * @param partitionBy list of field references
+    * @return an over window with defined partitioning
     */
-  def as(alias: Expression): OverWindow = {
-    OverWindow(alias, partitionByExpr, orderByExpr, UNBOUNDED_RANGE, 
CURRENT_RANGE)
+  def partitionBy(partitionBy: String): OverWindowPartitioned = {
+    new 
OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy))
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 2f88248..5af1d09 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.api.scala
 
-import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.Expression
 
 /**
   * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
@@ -85,73 +85,36 @@ object Session {
 }
 
 /**
-  * Helper object for creating a over window.
+  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+  * aggregate for each input row over a range of its neighboring rows.
   */
 object Over {
 
   /**
-    * Specifies the time attribute on which rows are grouped.
+    * Specifies the time attribute on which rows are ordered.
     *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
     *
     * For batch tables, refer to a timestamp or long attribute.
-    */
-  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
-    new OverWindowWithOrderBy(Seq[Expression](), orderBy)
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
     *
-    * @param partitionBy some partition keys.
-    * @return A partitionedOver instance that only contains the orderBy method.
+    * @param orderBy field reference
+    * @return an over window with defined order
     */
-  def partitionBy(partitionBy: Expression*): PartitionedOver = {
-    PartitionedOver(partitionBy.toArray)
+  def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
+    new OverWindowPartitionedOrdered(Seq(), orderBy)
   }
-}
-
-case class PartitionedOver(partitionBy: Array[Expression]) {
 
   /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    */
-  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
-    OverWindowWithOrderBy(partitionBy, orderBy)
-  }
-}
-
-case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: 
Expression) {
-
-  /**
-    * Set the preceding offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return this over window
-    */
-  def preceding(preceding: Expression): OverWindowWithPreceding = {
-    new OverWindowWithPreceding(partitionBy, orderBy, preceding)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    * Partitions the elements on some partition keys.
     *
-    * @param alias alias for this over window
-    * @return over window
-    */
-  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    * Each partition is individually sorted and aggregate functions are 
applied to each
+    * partition separately.
     *
-    * @param alias alias for this over window
-    * @return over window
+    * @param partitionBy list of field references
+    * @return an over window with defined partitioning
     */
-  def as(alias: Expression): OverWindow = {
-    OverWindow(alias, partitionBy, orderBy, UNBOUNDED_RANGE, CURRENT_RANGE)
+  def partitionBy(partitionBy: Expression*): OverWindowPartitioned = {
+    new OverWindowPartitioned(partitionBy)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
index 960619e..5dd6a4c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -20,34 +20,130 @@ package org.apache.flink.table.api
 
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW}
 
 /**
-  * Over window is similar to the traditional OVER SQL.
+  * An over window specification.
+  *
+  * Similar to SQL, over window aggregates compute an aggregate for each input 
row over a range
+  * of its neighboring rows.
+  */
+class OverWindow(
+    alias: Expression,
+    partitionBy: Seq[Expression],
+    orderBy: Expression,
+    preceding: Expression,
+    following: Option[Expression]) {
+
+  def getAlias: Expression = alias
+
+  def getPartitioning: Seq[Expression] = partitionBy
+
+  def getOrder: Expression = orderBy
+
+  def getPreceding: Expression = preceding
+
+  def getFollowing: Option[Expression] = following
+}
+
+// 
------------------------------------------------------------------------------------------------
+// Over windows
+// 
------------------------------------------------------------------------------------------------
+
+/**
+  * Partially defined over window with partitioning.
   */
-case class OverWindow(
-    private[flink] val alias: Expression,
-    private[flink] val partitionBy: Seq[Expression],
-    private[flink] val orderBy: Expression,
-    private[flink] val preceding: Expression,
-    private[flink] val following: Expression)
+class OverWindowPartitioned(partitionBy: Seq[Expression]) {
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderBy field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderBy: String): OverWindowPartitionedOrdered = {
+    this.orderBy(ExpressionParser.parseExpression(orderBy))
+  }
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderBy field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
+    new OverWindowPartitionedOrdered(partitionBy, orderBy)
+  }
+}
 
 /**
-  * A partially defined over window.
+  * Partially defined over window with (optional) partitioning and order.
   */
-class OverWindowWithPreceding(
+class OverWindowPartitionedOrdered(partitionBy: Seq[Expression], orderBy: 
Expression) {
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return an over window with defined preceding
+    */
+  def preceding(preceding: String): OverWindowPartitionedOrderedPreceding = {
+    this.preceding(ExpressionParser.parseExpression(preceding))
+  }
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return an over window with defined preceding
+    */
+  def preceding(preceding: Expression): OverWindowPartitionedOrderedPreceding 
= {
+    new OverWindowPartitionedOrderedPreceding(partitionBy, orderBy, preceding)
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return the fully defined over window
+    */
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return the fully defined over window
+    */
+  def as(alias: Expression): OverWindow = {
+    new OverWindow(alias, partitionBy, orderBy, UnboundedRange(), None)
+  }
+}
+
+/**
+  * Partially defined over window with (optional) partitioning, order, and 
preceding.
+  */
+class OverWindowPartitionedOrderedPreceding(
     private val partitionBy: Seq[Expression],
     private val orderBy: Expression,
     private val preceding: Expression) {
 
-  private[flink] var following: Expression = _
+  private var optionalFollowing: Option[Expression] = None
 
   /**
     * Assigns an alias for this window that the following `select()` clause 
can refer to.
     *
     * @param alias alias for this over window
-    * @return over window
+    * @return the fully defined over window
     */
   def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
 
@@ -55,28 +151,19 @@ class OverWindowWithPreceding(
     * Assigns an alias for this window that the following `select()` clause 
can refer to.
     *
     * @param alias alias for this over window
-    * @return over window
+    * @return the fully defined over window
     */
   def as(alias: Expression): OverWindow = {
-
-    // set following to CURRENT_ROW / CURRENT_RANGE if not defined
-    if (null == following) {
-      if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
-        following = CURRENT_ROW
-      } else {
-        following = CURRENT_RANGE
-      }
-    }
-    OverWindow(alias, partitionBy, orderBy, preceding, following)
+    new OverWindow(alias, partitionBy, orderBy, preceding, optionalFollowing)
   }
 
   /**
     * Set the following offset (based on time or row-count intervals) for over 
window.
     *
     * @param following following offset that relative to the current row.
-    * @return this over window
+    * @return an over window with defined following
     */
-  def following(following: String): OverWindowWithPreceding = {
+  def following(following: String): OverWindowPartitionedOrderedPreceding = {
     this.following(ExpressionParser.parseExpression(following))
   }
 
@@ -84,26 +171,29 @@ class OverWindowWithPreceding(
     * Set the following offset (based on time or row-count intervals) for over 
window.
     *
     * @param following following offset that relative to the current row.
-    * @return this over window
+    * @return an over window with defined following
     */
-  def following(following: Expression): OverWindowWithPreceding = {
-    this.following = following
+  def following(following: Expression): OverWindowPartitionedOrderedPreceding 
= {
+    optionalFollowing = Some(following)
     this
   }
 }
 
+// 
------------------------------------------------------------------------------------------------
+// Group windows
+// 
------------------------------------------------------------------------------------------------
+
 /**
-  * A window specification.
+  * A group window specification.
   *
-  * Window groups rows based on time or row-count intervals. It is a general 
way to group the
-  * elements, which is very helpful for both groupBy-aggregations and 
over-aggregations to
-  * compute aggregates on groups of elements.
+  * Group windows group rows based on time or row-count intervals and is 
therefore essentially a
+  * special type of groupBy. Just like groupBy, group windows allow to compute 
aggregates
+  * on groups of elements.
   *
   * Infinite streaming tables can only be grouped into time or row intervals. 
Hence window grouping
   * is required to apply aggregations on streaming tables.
   *
-  * For finite batch tables, window provides shortcuts for time-based groupBy.
-  *
+  * For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
   */
 abstract class Window(val alias: Expression, val timeField: Expression) {
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 82b066c..7b3088c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{OverWindow, TableEnvironment, 
ValidationException}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical.{LogicalNode, Project}
+import org.apache.flink.table.typeutils.RowIntervalTypeInfo
 
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -254,16 +255,23 @@ object ProjectionTranslator {
 
     expr match {
       case u: UnresolvedOverCall =>
-        val overWindow = overWindows.find(_.alias.equals(u.alias))
-        if (overWindow.isDefined) {
-          OverCall(
-            u.agg,
-            overWindow.get.partitionBy,
-            overWindow.get.orderBy,
-            overWindow.get.preceding,
-            overWindow.get.following)
-        } else {
-          u
+        overWindows.find(_.getAlias.equals(u.alias)) match {
+          case Some(overWindow) =>
+            OverCall(
+              u.agg,
+              overWindow.getPartitioning,
+              overWindow.getOrder,
+              overWindow.getPreceding,
+              overWindow.getFollowing.getOrElse {
+                // set following to CURRENT_ROW / CURRENT_RANGE if not defined
+                if 
(overWindow.getPreceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
+                  CurrentRow()
+                } else {
+                  CurrentRange()
+                }
+              })
+
+          case None => u
         }
 
       case u: UnaryExpression =>

Reply via email to