This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 44c2474  [FLINK-11918][table] Deprecated some Window APIs and Rename 
Window to GroupWindow
44c2474 is described below

commit 44c24749998120695e43e7dbea3aa7b6bc8b00bf
Author: hequn8128 <chenghe...@gmail.com>
AuthorDate: Thu Mar 14 18:06:19 2019 +0800

    [FLINK-11918][table] Deprecated some Window APIs and Rename Window to 
GroupWindow
    
    This closes #7985
---
 docs/dev/table/tableApi.md                         |  14 +-
 .../org/apache/flink/table/api/java/windows.scala  |  88 ++-------
 .../org/apache/flink/table/api/scala/windows.scala |  90 +++------
 .../scala/org/apache/flink/table/api/table.scala   |  57 +++++-
 .../scala/org/apache/flink/table/api/windows.scala | 217 ++++++++++++++++++++-
 .../table/api/batch/table/GroupWindowTest.scala    |   1 +
 .../validation/GroupWindowValidationTest.scala     |   2 +-
 .../validation/OverWindowValidationTest.scala      |   2 +-
 .../table/api/stream/table/AggregateTest.scala     |   1 +
 .../flink/table/api/stream/table/CalcTest.scala    |   1 +
 .../table/api/stream/table/GroupWindowTest.scala   |   1 +
 .../table/api/stream/table/OverWindowTest.scala    |   2 +-
 .../table/api/stream/table/TableSourceTest.scala   |   2 +-
 .../stringexpr/AggregateStringExpressionTest.scala |   6 +-
 .../GroupWindowStringExpressionTest.scala          |  14 +-
 .../OverWindowStringExpressionTest.scala           |  48 ++---
 .../table/validation/CalcValidationTest.scala      |   4 +-
 .../validation/GroupWindowValidationTest.scala     |   2 +-
 .../validation/OverWindowValidationTest.scala      |   2 +-
 .../flink/table/plan/RetractionRulesTest.scala     |   2 +-
 .../table/plan/TimeIndicatorConversionTest.scala   |   1 +
 .../flink/table/plan/UpdatingPlanCheckerTest.scala |   2 +-
 .../runtime/batch/table/GroupWindowITCase.scala    |   1 +
 .../runtime/batch/table/TableSourceITCase.scala    |   2 +-
 .../runtime/stream/TimeAttributesITCase.scala      |   2 +-
 .../runtime/stream/table/GroupWindowITCase.scala   |   2 +-
 .../table/runtime/stream/table/JoinITCase.scala    |   2 +-
 .../runtime/stream/table/OverWindowITCase.scala    |   1 +
 .../runtime/stream/table/TableSinkITCase.scala     |   2 +-
 .../runtime/stream/table/TableSourceITCase.scala   |   2 +-
 30 files changed, 376 insertions(+), 197 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a509d85..f4c74cc 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1271,14 +1271,14 @@ orders.insertInto("OutOrders")
 
 Group window aggregates group rows into finite groups based on time or 
row-count intervals and evaluate aggregation functions once per group. For 
batch tables, windows are a convenient shortcut to group records by time 
intervals.
 
-Windows are defined using the `window(w: Window)` clause and require an alias, 
which is specified using the `as` clause. In order to group a table by a 
window, the window alias must be referenced in the `groupBy(...)` clause like a 
regular grouping attribute. 
+Windows are defined using the `window(w: GroupWindow)` clause and require an 
alias, which is specified using the `as` clause. In order to group a table by a 
window, the window alias must be referenced in the `groupBy(...)` clause like a 
regular grouping attribute. 
 The following example shows how to define a window aggregation on a table.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .window([Window w].as("w"))  // define window with alias w
+  .window([GroupWindow w].as("w"))  // define window with alias w
   .groupBy("w")  // group the table by window w
   .select("b.sum");  // aggregate
 {% endhighlight %}
@@ -1287,7 +1287,7 @@ Table table = input
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .window([w: Window] as 'w)  // define window with alias w
+  .window([w: GroupWindow] as 'w)  // define window with alias w
   .groupBy('w)   // group the table by window w
   .select('b.sum)  // aggregate
 {% endhighlight %}
@@ -1301,7 +1301,7 @@ The following example shows how to define a window 
aggregation with additional g
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .window([Window w].as("w"))  // define window with alias w
+  .window([GroupWindow w].as("w"))  // define window with alias w
   .groupBy("w, a")  // group the table by attribute a and window w 
   .select("a, b.sum");  // aggregate
 {% endhighlight %}
@@ -1310,7 +1310,7 @@ Table table = input
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .window([w: Window] as 'w) // define window with alias w
+  .window([w: GroupWindow] as 'w) // define window with alias w
   .groupBy('w, 'a)  // group the table by attribute a and window w 
   .select('a, 'b.sum)  // aggregate
 {% endhighlight %}
@@ -1323,7 +1323,7 @@ Window properties such as the start, end, or rowtime 
timestamp of a time window
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Table table = input
-  .window([Window w].as("w"))  // define window with alias w
+  .window([GroupWindow w].as("w"))  // define window with alias w
   .groupBy("w, a")  // group the table by attribute a and window w 
   .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add 
window start, end, and rowtime timestamps
 {% endhighlight %}
@@ -1332,7 +1332,7 @@ Table table = input
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val table = input
-  .window([w: Window] as 'w)  // define window with alias w
+  .window([w: GroupWindow] as 'w)  // define window with alias w
   .groupBy('w, 'a)  // group the table by attribute a and window w 
   .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add 
window start, end, and rowtime timestamps
 {% endhighlight %}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
index 121aab8..8cfeca8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -19,26 +19,19 @@
 package org.apache.flink.table.api.java
 
 import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE}
-import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
+import org.apache.flink.table.api.{OverWindow, OverWindowWithPreceding}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.api._
 
 /**
   * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
   * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
   * elements in 5 minutes intervals.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Tumble]].
   */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are consecutive, 
non-overlapping
-    * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a partially defined tumbling window
-    */
-  def over(size: String): TumbleWithSize = new TumbleWithSize(size)
-}
+@Deprecated
+object Tumble extends TumbleBase
 
 /**
   * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
@@ -48,71 +41,30 @@ object Tumble {
   * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
   * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
   * window evaluations.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Slide]].
   */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive window evaluations.
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: String): SlideWithSize = new SlideWithSize(size)
-}
+@Deprecated
+object Slide extends SlideBase
 
 /**
   * Helper class for creating a session window. The boundary of session 
windows are defined by
   * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
   * gap period.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Session]].
   */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a partially defined session window
-    */
-  def withGap(gap: String): SessionWithGap = new SessionWithGap(gap)
-}
+@Deprecated
+object Session extends SessionBase
 
 /**
-  * Helper object for creating a over window.
+  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+  * aggregate for each input row over a range of its neighboring rows.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Over]].
   */
-object Over {
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    */
-  def orderBy(orderBy: String): OverWindowWithOrderBy = {
-    val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
-    *
-    * @param partitionBy some partition keys.
-    * @return A partitionedOver instance that only contains the orderBy method.
-    */
-  def partitionBy(partitionBy: String): PartitionedOver = {
-    val partitionByExpr = 
ExpressionParser.parseExpressionList(partitionBy).toArray
-    new PartitionedOver(partitionByExpr)
-  }
-}
+@Deprecated
+object Over extends OverBase
 
 class PartitionedOver(private val partitionByExpr: Array[Expression]) {
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 2f88248..1172822 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,26 +18,20 @@
 
 package org.apache.flink.table.api.scala
 
-import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
+import org.apache.flink.table.api.{OverWindow, OverWindowWithPreceding}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.api._
 
 /**
   * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
   * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
   * elements in 5 minutes intervals.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Tumble]].
   */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, 
non-overlapping
-    * windows. For example, a tumbling window of 5 minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a partially defined tumbling window
-    */
-  def over(size: Expression): TumbleWithSize = new TumbleWithSize(size)
-}
+@deprecated(
+  "This class will be replaced by org.apache.flink.table.api.Tumble.", "1.8")
+object Tumble extends TumbleBase
 
 /**
   * Helper object for creating a sliding window. Sliding windows have a fixed 
size and slide by
@@ -47,69 +41,33 @@ object Tumble {
   * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
   * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
   * window evaluations.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Slide]].
   */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
-}
+@deprecated(
+  "This class will be replaced by org.apache.flink.table.api.Slide.", "1.8")
+object Slide extends SlideBase
 
 /**
   * Helper object for creating a session window. The boundary of session 
windows are defined by
   * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
   * gap period.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Session]].
   */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a partially defined session window
-    */
-  def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap)
-}
+@deprecated(
+  "This class will be replaced by org.apache.flink.table.api.Session.", "1.8")
+object Session extends SessionBase
 
 /**
-  * Helper object for creating a over window.
+  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+  * aggregate for each input row over a range of its neighboring rows.
+  *
+  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Over]].
   */
-object Over {
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    */
-  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
-    new OverWindowWithOrderBy(Seq[Expression](), orderBy)
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
-    *
-    * @param partitionBy some partition keys.
-    * @return A partitionedOver instance that only contains the orderBy method.
-    */
-  def partitionBy(partitionBy: Expression*): PartitionedOver = {
-    PartitionedOver(partitionBy.toArray)
-  }
-}
+@deprecated(
+  "This class will be replaced by org.apache.flink.table.api.Over.", "1.8")
+object Over extends OverBase
 
 case class PartitionedOver(partitionBy: Array[Expression]) {
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
index e2f1161..4e7b918 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
@@ -1165,12 +1165,40 @@ class Table(
     *
     * @param window window that specifies how elements are grouped.
     * @return A windowed table.
+    *
+    * @deprecated Will be removed in a future release. Please use 
Table.window(window: GroupWindow)
+    *             instead.
     */
+  @deprecated(
+    "This method will be removed. Please use Table.window(window: GroupWindow) 
instead.",
+    "1.8")
+  @Deprecated
   def window(window: Window): WindowedTable = {
     new WindowedTable(this, window)
   }
 
   /**
+    * Groups the records of a table by assigning them to windows defined by a 
time or row interval.
+    *
+    * For streaming tables of infinite size, grouping into windows is required 
to define finite
+    * groups on which group-based aggregates can be computed.
+    *
+    * For batch tables of finite size, windowing essentially provides 
shortcuts for time-based
+    * groupBy.
+    *
+    * __Note__: Computing windowed aggregates on a streaming table is only a 
parallel operation
+    * if additional grouping attributes are added to the `groupBy(...)` clause.
+    * If the `groupBy(...)` only references a window alias, the streamed table 
will be processed
+    * by a single task, i.e., with parallelism 1.
+    *
+    * @param window group window that specifies how elements are grouped.
+    * @return A group windowed table.
+    */
+  def window(window: GroupWindow): GroupWindowedTable = {
+    new GroupWindowedTable(this, window)
+  }
+
+  /**
     * Defines over-windows on the records of a table.
     *
     * An over-window defines for each record an interval of records over which 
aggregation
@@ -1291,6 +1319,14 @@ class GroupedTable(
   }
 }
 
+/**
+  * A table that has been windowed for grouping [[Window]]s.
+  *
+  * @deprecated Will be replaced by [[GroupWindowedTable]].
+  */
+@Deprecated
+@deprecated(
+  "This class will be replaced by GroupWindowedTable.", "1.8")
 class WindowedTable(
     private[flink] val table: Table,
     private[flink] val window: Window) {
@@ -1343,6 +1379,15 @@ class WindowedTable(
 
 }
 
+/**
+  * A table that has been windowed for [[GroupWindow]]s.
+  */
+class GroupWindowedTable(
+    override private[flink] val table: Table,
+    override private[flink] val window: GroupWindow)
+  extends WindowedTable(table, window)
+
+
 class OverWindowedTable(
     private[flink] val table: Table,
     private[flink] val overWindows: Array[OverWindow]) {
@@ -1379,11 +1424,21 @@ class OverWindowedTable(
   }
 }
 
-class WindowGroupedTable(
+/**
+  * A table that has been windowed and grouped for [[GroupWindow]]s.
+  *
+  * @deprecated The constructor contains [[Window]] parameter will be removed. 
Use constructor
+  *             with [[GroupWindow]] instead.
+  */
+class WindowGroupedTable @Deprecated() (
     private[flink] val table: Table,
     private[flink] val groupKeys: Seq[Expression],
     private[flink] val window: Window) {
 
+  def this(table: Table, groupKeys: Seq[Expression], window: GroupWindow) {
+    this(table, groupKeys, window.asInstanceOf[Window])
+  }
+
   /**
     * Performs a selection operation on a window grouped table. Similar to an 
SQL SELECT statement.
     * The field expressions can contain complex expressions and aggregations.
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
index ee022b1..57f382c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -18,10 +18,11 @@
 
 package org.apache.flink.table.api
 
+import org.apache.flink.table.api.java.{PartitionedOver => JPartitionedOver, 
OverWindowWithOrderBy => JOverWindowWithOrderBy}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW}
+import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW, 
OverWindowWithOrderBy, PartitionedOver}
 
 /**
   * Over window is similar to the traditional OVER SQL.
@@ -134,9 +135,13 @@ class OverWindowWithPreceding(
   * Infinite streaming tables can only be grouped into time or row intervals. 
Hence window grouping
   * is required to apply aggregations on streaming tables.
   *
-  * For finite batch tables, window provides shortcuts for time-based groupBy.
+  * For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
   *
+  * @deprecated Will be replaced by [[GroupWindow]]
   */
+@Deprecated
+@deprecated(
+  "This class will be replaced by GroupWindow.", "1.8")
 abstract class Window(val alias: Expression, val timeField: Expression) {
 
   /**
@@ -146,6 +151,21 @@ abstract class Window(val alias: Expression, val 
timeField: Expression) {
 
 }
 
+/**
+  * A group window specification.
+  *
+  * Group windows group rows based on time or row-count intervals and is 
therefore essentially a
+  * special type of groupBy. Just like groupBy, group windows allow to compute 
aggregates
+  * on groups of elements.
+  *
+  * Infinite streaming tables can only be grouped into time or row intervals. 
Hence window grouping
+  * is required to apply aggregations on streaming tables.
+  *
+  * For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
+  */
+abstract class GroupWindow(alias: Expression, timeField: Expression)
+  extends Window(alias, timeField)
+
 // 
------------------------------------------------------------------------------------------------
 // Tumbling windows
 // 
------------------------------------------------------------------------------------------------
@@ -234,7 +254,7 @@ class TumbleWithSizeOnTimeWithAlias(
     alias: Expression,
     timeField: Expression,
     size: Expression)
-  extends Window(
+  extends GroupWindow(
     alias,
     timeField) {
 
@@ -369,7 +389,7 @@ class SlideWithSizeAndSlideOnTimeWithAlias(
     timeField: Expression,
     size: Expression,
     slide: Expression)
-  extends Window(
+  extends GroupWindow(
     alias,
     timeField) {
 
@@ -469,7 +489,7 @@ class SessionWithGapOnTimeWithAlias(
     alias: Expression,
     timeField: Expression,
     gap: Expression)
-  extends Window(
+  extends GroupWindow(
     alias,
     timeField) {
 
@@ -480,3 +500,190 @@ class SessionWithGapOnTimeWithAlias(
     SessionGroupWindow(alias, timeField, gap)
   }
 }
+
+/**
+  * Base class for Tumble Window Helper classes. This class contains help 
methods to create Tumble
+  * Windows.
+  */
+class TumbleBase {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are consecutive, 
non-overlapping
+    * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a partially defined tumbling window
+    */
+  def over(size: String): TumbleWithSize = new TumbleWithSize(size)
+
+  /**
+    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, 
non-overlapping
+    * windows. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a partially defined tumbling window
+    */
+  def over(size: Expression): TumbleWithSize = new TumbleWithSize(size)
+}
+
+/**
+  * Base class for Slide Window Helper classes. This class contains help 
methods to create Slide
+  * Windows.
+  */
+class SlideBase {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
+    * consecutive window evaluations.
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: String): SlideWithSize = new SlideWithSize(size)
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
+    * consecutive
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Base class for Session Window Helper classes. This class contains help 
methods to create Session
+  * Windows.
+  */
+class SessionBase {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
+    *            closing the session window.
+    * @return a partially defined session window
+    */
+  def withGap(gap: String): SessionWithGap = new SessionWithGap(gap)
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
+    *            closing the session window.
+    * @return a partially defined session window
+    */
+  def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap)
+}
+
+/**
+  * Base class for Over Window Helper classes. This class contains help 
methods to create Over
+  * Windows.
+  */
+class OverBase {
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderBy field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderBy: String): JOverWindowWithOrderBy = {
+    val orderByExpr = ExpressionParser.parseExpression(orderBy)
+    new JOverWindowWithOrderBy(Array[Expression](), orderByExpr)
+  }
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderBy field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
+    OverWindowWithOrderBy(Seq[Expression](), orderBy)
+  }
+
+  /**
+    * Partitions the elements on some partition keys.
+    *
+    * Each partition is individually sorted and aggregate functions are 
applied to each
+    * partition separately.
+    *
+    * @param partitionBy list of field references
+    * @return an over window with defined partitioning
+    */
+  def partitionBy(partitionBy: String): JPartitionedOver = {
+    val partitionByExpr = 
ExpressionParser.parseExpressionList(partitionBy).toArray
+    new JPartitionedOver(partitionByExpr)
+  }
+
+  /**
+    * Partitions the elements on some partition keys.
+    *
+    * Each partition is individually sorted and aggregate functions are 
applied to each
+    * partition separately.
+    *
+    * @param partitionBy list of field references
+    * @return an over window with defined partitioning
+    */
+  def partitionBy(partitionBy: Expression*): PartitionedOver = {
+    PartitionedOver(partitionBy.toArray)
+  }
+}
+
+/**
+  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble extends TumbleBase
+
+/**
+  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
+  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
+  * window evaluations.
+  */
+object Slide extends SlideBase
+
+/**
+  * Helper class for creating a session window. The boundary of session 
windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+  * gap period.
+  */
+object Session extends SessionBase
+
+/**
+  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+  * aggregate for each input row over a range of its neighboring rows.
+  */
+object Over extends OverBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index 27c1d7f..46f35bc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.batch.table
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.WindowReference
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
index 7cf1b82..15d7058 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.batch.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
index df8f7df..b19a523 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.batch.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Tumble, ValidationException}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
index afa9f8b..749120d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
index 5298ea4..84e685e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.Tumble
 import org.apache.flink.table.expressions.WindowReference
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.utils.TableTestBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index a59ad83..ae533ee 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMerge}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.WindowReference
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
index eeb7d5f..7603a8c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.Func1
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.{Over, Table}
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index 5e3a410..c6ba354 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{Over, TableSchema, Tumble, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{TableTestBase, 
TestNestedProjectableTableSource, TestProjectableTableSource, 
TestTableSourceWithTime}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
index 0833c24..63c7866 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Tumble
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.java.{Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMergeAndReset}
 import org.apache.flink.table.utils.TableTestBase
@@ -143,7 +143,7 @@ class AggregateStringExpressionTest extends TableTestBase {
 
     // String / Java API
     val resJava = t
-      .window(JTumble.over("50.milli").on("proctime").as("w1"))
+      .window(Tumble.over("50.milli").on("proctime").as("w1"))
       .groupBy("w1, string")
       .select("w1.proctime as proctime, w1.start as start, w1.end as end, 
string, int.count")
 
@@ -163,7 +163,7 @@ class AggregateStringExpressionTest extends TableTestBase {
 
     // String / Java API
     val resJava = t
-      .window(JTumble.over("50.milli").on("rowtime").as("w1"))
+      .window(Tumble.over("50.milli").on("rowtime").as("w1"))
       .groupBy("w1, string")
       .select("w1.rowtime as rowtime, string, int.count")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index 2cb5a8a..2e05139 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import org.apache.flink.table.api.scala._
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.{Session => JSession, Slide => JSlide, 
Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
@@ -53,7 +53,7 @@ class GroupWindowStringExpressionTest extends TableTestBase {
 
     // String / Java API
     val resJava = t
-      .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w"))
+      .window(Slide.over("4.hours").every("2.hours").on("rowtime").as("w"))
       .groupBy("w, string")
       .select(
         "string, " +
@@ -92,7 +92,7 @@ class GroupWindowStringExpressionTest extends TableTestBase {
 
     // String / Java API
     val resJava = t
-      .window(JTumble.over("4.hours").on("rowtime").as("w"))
+      .window(Tumble.over("4.hours").on("rowtime").as("w"))
       .groupBy("w, string")
       .select(
         "string, " +
@@ -130,7 +130,7 @@ class GroupWindowStringExpressionTest extends TableTestBase 
{
 
     // String / Java API
     val resJava = t
-      .window(JSession.withGap("4.hours").on("rowtime").as("w"))
+      .window(Session.withGap("4.hours").on("rowtime").as("w"))
       .groupBy("w, string")
       .select(
         "string, " +
@@ -167,7 +167,7 @@ class GroupWindowStringExpressionTest extends TableTestBase 
{
 
     // String / Java API
     val resJava = t
-      .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w"))
+      .window(Slide.over("4.hours").every("2.hours").on("proctime").as("w"))
       .groupBy("w")
       .select(
         "myCountFun(string), " +
@@ -204,7 +204,7 @@ class GroupWindowStringExpressionTest extends TableTestBase 
{
 
     // String / Java API
     val resJava = t
-      .window(JTumble.over("4.hours").on("proctime").as("w"))
+      .window(Tumble.over("4.hours").on("proctime").as("w"))
       .groupBy("w")
       .select(
         "myCountFun(string), " +
@@ -241,7 +241,7 @@ class GroupWindowStringExpressionTest extends TableTestBase 
{
 
     // String / Java API
     val resJava = t
-      .window(JSession.withGap("4.hours").on("proctime").as("w"))
+      .window(Session.withGap("4.hours").on("proctime").as("w"))
       .groupBy("w")
       .select(
         "myCountFun(string), " +
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
index 99114b8..f3b312b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Over
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.{Over => JOver}
-import org.apache.flink.table.api.scala.{Over => SOver, _}
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.Func1
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
@@ -38,10 +38,10 @@ class OverWindowStringExpressionTest extends TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 
'w)
+      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 
'w)
       .select('a, 'b.sum over 'w as 'cnt, weightAvgFun('a, 'b) over 'w as 
'myCnt)
     val resJava = t
-      
.window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
+      
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
       .select("a, SUM(b) OVER w as cnt, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -56,10 +56,10 @@ class OverWindowStringExpressionTest extends TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following 
CURRENT_ROW as 'w)
+      .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW following 
CURRENT_ROW as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      
.window(JOver.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w"))
+      
.window(Over.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -74,10 +74,10 @@ class OverWindowStringExpressionTest extends TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 
'w)
+      .window(Over partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 
'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      .window(JOver.partitionBy("a, 
d").orderBy("rowtime").preceding("10.rows").as("w"))
+      .window(Over.partitionBy("a, 
d").orderBy("rowtime").preceding("10.rows").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -92,10 +92,10 @@ class OverWindowStringExpressionTest extends TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW 
as 'w)
+      .window(Over orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 
'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      
.window(JOver.orderBy("rowtime").preceding("10.rows").following("current_row").as("w"))
+      
.window(Over.orderBy("rowtime").preceding("10.rows").following("current_row").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -110,10 +110,10 @@ class OverWindowStringExpressionTest extends 
TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE 
as 'w)
+      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE 
as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      
.window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"))
+      
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -128,15 +128,15 @@ class OverWindowStringExpressionTest extends 
TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
+      .window(Over orderBy 'rowtime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(
-        
JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
+        
Over.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
     val resJava2 = t
       .window(
-        JOver.orderBy("rowtime").as("w"))
+        Over.orderBy("rowtime").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -152,15 +152,15 @@ class OverWindowStringExpressionTest extends 
TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
+      .window(Over orderBy 'proctime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
       .window(
-        
JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w"))
+        
Over.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
     val resJava2 = t
       .window(
-        JOver.orderBy("proctime").as("w"))
+        Over.orderBy("proctime").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -176,10 +176,10 @@ class OverWindowStringExpressionTest extends 
TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes 
as 'w)
+      .window(Over partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes 
as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      .window(JOver.partitionBy("a, 
c").orderBy("rowtime").preceding("10.minutes").as("w"))
+      .window(Over.partitionBy("a, 
c").orderBy("rowtime").preceding("10.minutes").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -194,10 +194,10 @@ class OverWindowStringExpressionTest extends 
TableTestBase {
     util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
 
     val resScala = t
-      .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE 
as 'w)
+      .window(Over orderBy 'rowtime preceding 4.hours following CURRENT_RANGE 
as 'w)
       .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
     val resJava = t
-      
.window(JOver.orderBy("rowtime").preceding("4.hours").following("current_range").as("w"))
+      
.window(Over.orderBy("rowtime").preceding("4.hours").following("current_range").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
@@ -214,7 +214,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
     util.addFunction("weightedAvg", weightedAvg)
 
     val resScala = t
-      .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 
'w)
+      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 
'w)
       .select(
         array('a.sum over 'w, 'a.count over 'w),
         plusOne('b.sum over 'w as 'wsum) as 'd,
@@ -223,7 +223,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
         "AVG:".toExpr + (weightedAvg('a, 'b) over 'w))
 
     val resJava = t
-      
.window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
+      
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
       .select(
         s"""
            |ARRAY(SUM(a) OVER w, COUNT(a) OVER w),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index 6e0e15e..03438c8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.stream.table.validation
 import java.math.BigDecimal
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.scala.{Tumble, _}
+import org.apache.flink.table.api.{Tumble, ValidationException}
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala
index 8d5943d..5a507a4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.stream.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
index 37e1526..350ad06 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table.validation
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.api.{Over, Table, Tumble, ValidationException}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, 
WeightedAvgWithRetract}
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index eb9d4b2..5b4f364 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan
 
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.{Table, Tumble}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.nodes.datastream._
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.CountDistinct
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 095cd04..7c00354 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan
 import java.sql.Timestamp
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Tumble
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.{TimeIntervalUnit, WindowReference}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
index bd2a868..a57cabd 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan
 
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api.{Table, Tumble}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.utils.StreamTableTestUtil
 import org.junit.Assert._
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
index 0b6470f..80886bc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.batch.table
 import java.math.BigDecimal
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase
 import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index a0f6c53..b152dcb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, 
RowTypeInfo}
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment => JExecEnv}
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types}
 import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.runtime.utils.{CommonTestData, 
TableProgramsCollectionTestBase}
 import org.apache.flink.table.sources.BatchTableSource
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 2fb1fcd..a81b3ed 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{TableSchema, Tumble, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import 
org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark,
 TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index c7a71e4..d9b7b50 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.api.{Session, Slide, StreamQueryConfig, Tumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 926319f..6341620 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.api.{StreamQueryConfig, Tumble, Types}
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils.Func20
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
index c0a95ab..bb1c813 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.Over
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 CountDistinctWithRetractAndReset, WeightedAvg}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0
 import org.apache.flink.table.api.scala._
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 84d8d21..8080082 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.api.{TableException, Tumble, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index 52df3d5..eb571d5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment =>
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types}
 import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.table.utils._

Reply via email to