[FLINK-6392] [table] Improve group window API to make alias mandatory.

This closes #3786.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb972b85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb972b85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb972b85

Branch: refs/heads/master
Commit: bb972b85a76d9b98ca5e90033e448f68f9109485
Parents: 69905e1
Author: sunjincheng121 <sunjincheng...@gmail.com>
Authored: Thu Apr 27 09:58:18 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Apr 27 17:04:00 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/api/java/windows.scala   |  18 +-
 .../apache/flink/table/api/scala/windows.scala  |  18 +-
 .../org/apache/flink/table/api/table.scala      |   5 +-
 .../org/apache/flink/table/api/windows.scala    | 297 ++++++++++++-------
 .../table/plan/logical/LogicalWindow.scala      |   7 +-
 .../flink/table/plan/logical/groupWindows.scala |  28 +-
 .../flink/table/plan/logical/operators.scala    |   2 +-
 .../common/WindowStartEndPropertiesRule.scala   |   4 +-
 .../scala/batch/sql/WindowAggregateTest.scala   |  12 +-
 .../scala/batch/table/FieldProjectionTest.scala |   4 +-
 .../api/scala/batch/table/GroupWindowTest.scala |  18 +-
 .../scala/stream/sql/WindowAggregateTest.scala  |   6 +-
 .../scala/stream/table/GroupWindowTest.scala    |  48 +--
 13 files changed, 274 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
index 1e9d65a..15208ce 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.api.java
 
-import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, 
SlideWithSize, TumblingWindow}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, 
SlideWithSize, SessionWithGap}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 
 /**
@@ -34,9 +34,9 @@ object Tumble {
     * elements in 5 minutes intervals.
     *
     * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
+    * @return a partially defined tumbling window
     */
-  def over(size: String): TumblingWindow = new TumblingWindow(size)
+  def over(size: String): TumbleWithSize = new TumbleWithSize(size)
 }
 
 /**
@@ -79,9 +79,9 @@ object Session {
     *
     * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
     *            closing the session window.
-    * @return a session window
+    * @return a partially defined session window
     */
-  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
+  def withGap(gap: String): SessionWithGap = new SessionWithGap(gap)
 }
 
 /**
@@ -96,9 +96,9 @@ object Over {
     *
     * For batch tables, refer to a timestamp or long attribute.
     */
-  def orderBy(orderBy: String): OverWindowPredefined = {
+  def orderBy(orderBy: String): OverWindowWithOrderBy = {
     val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowPredefined(Seq[Expression](), orderByExpr)
+    new OverWindowWithOrderBy(Seq[Expression](), orderByExpr)
   }
 
   /**
@@ -122,8 +122,8 @@ class PartitionedOver(private val partitionByExpr: 
Array[Expression]) {
     *
     * For batch tables, refer to a timestamp or long attribute.
     */
-  def orderBy(orderBy: String): OverWindowPredefined = {
+  def orderBy(orderBy: String): OverWindowWithOrderBy = {
     val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowPredefined(partitionByExpr, orderByExpr)
+    new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 1230027..5e70440 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.scala
 
 import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, 
SlideWithSize, TumblingWindow}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, 
SlideWithSize, SessionWithGap}
 
 /**
   * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
@@ -34,9 +34,9 @@ object Tumble {
     * elements in 5 minutes intervals.
     *
     * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
+    * @return a partially defined tumbling window
     */
-  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
+  def over(size: Expression): TumbleWithSize = new TumbleWithSize(size)
 }
 
 /**
@@ -79,9 +79,9 @@ object Session {
     *
     * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
     *            closing the session window.
-    * @return a session window
+    * @return a partially defined session window
     */
-  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
+  def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap)
 }
 
 /**
@@ -96,8 +96,8 @@ object Over {
     *
     * For batch tables, refer to a timestamp or long attribute.
     */
-  def orderBy(orderBy: Expression): OverWindowPredefined = {
-    new OverWindowPredefined(Seq[Expression](), orderBy)
+  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
+    new OverWindowWithOrderBy(Seq[Expression](), orderBy)
   }
 
   /**
@@ -120,7 +120,7 @@ case class PartitionedOver(partitionBy: Array[Expression]) {
     *
     * For batch tables, refer to a timestamp or long attribute.
     */
-  def orderBy(orderBy: Expression): OverWindowPredefined = {
-    new OverWindowPredefined(partitionBy, orderBy)
+  def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
+    new OverWindowWithOrderBy(partitionBy, orderBy)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index c688aab..0953611 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -805,9 +805,6 @@ class Table(
     * @return A windowed table.
     */
   def window(window: Window): WindowedTable = {
-    if (window.alias.isEmpty) {
-      throw new ValidationException("An alias must be specified for the 
window.")
-    }
     new WindowedTable(this, window)
   }
 
@@ -936,7 +933,7 @@ class WindowedTable(
     * }}}
     */
   def groupBy(fields: Expression*): WindowGroupedTable = {
-    val fieldsWithoutWindow = fields.filterNot(window.alias.get.equals(_))
+    val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
     if (fields.size != fieldsWithoutWindow.size + 1) {
       throw new ValidationException("GroupBy must contain exactly one window 
alias.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 577d77c..80260f7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -35,32 +35,40 @@ case class OverWindow(
 
 case class CurrentRow() extends Expression {
   override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+
   override private[flink] def children = Seq()
+
   override def toString = "CURRENT ROW"
 }
 
 case class CurrentRange() extends Expression {
   override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
   override private[flink] def children = Seq()
+
   override def toString = "CURRENT RANGE"
 }
 
 case class UnboundedRow() extends Expression {
   override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+
   override private[flink] def children = Seq()
+
   override def toString = "UNBOUNDED ROW"
 }
 
 case class UnboundedRange() extends Expression {
   override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
   override private[flink] def children = Seq()
+
   override def toString = "UNBOUNDED RANGE"
 }
 
 /**
-  * An over window predefined  specification.
+  * A partially defined over window.
   */
-class OverWindowPredefined(
+class OverWindowWithOrderBy(
     private val partitionBy: Seq[Expression],
     private val orderBy: Expression) {
 
@@ -100,7 +108,7 @@ class OverWindowPredefined(
     * @param preceding preceding offset relative to the current row.
     * @return this over window
     */
-  def preceding(preceding: String): OverWindowPredefined = {
+  def preceding(preceding: String): OverWindowWithOrderBy = {
     this.preceding(ExpressionParser.parseExpression(preceding))
   }
 
@@ -110,7 +118,7 @@ class OverWindowPredefined(
     * @param preceding preceding offset relative to the current row.
     * @return this over window
     */
-  def preceding(preceding: Expression): OverWindowPredefined = {
+  def preceding(preceding: Expression): OverWindowWithOrderBy = {
     this.preceding = preceding
     this
   }
@@ -121,7 +129,7 @@ class OverWindowPredefined(
     * @param following following offset that relative to the current row.
     * @return this over window
     */
-  def following(following: String): OverWindowPredefined = {
+  def following(following: String): OverWindowWithOrderBy = {
     this.following(ExpressionParser.parseExpression(following))
   }
 
@@ -131,7 +139,7 @@ class OverWindowPredefined(
     * @param following following offset that relative to the current row.
     * @return this over window
     */
-  def following(following: Expression): OverWindowPredefined = {
+  def following(following: Expression): OverWindowWithOrderBy = {
     this.following = following
     this
   }
@@ -149,15 +157,20 @@ class OverWindowPredefined(
   *
   * For finite batch tables, window provides shortcuts for time-based groupBy.
   *
+  * @param alias The expression of alias for this Window
   */
-abstract class Window {
+abstract class Window(val alias: Expression) {
 
-  // The expression of alias for this Window
-  private[flink] var alias: Option[Expression] = None
   /**
     * Converts an API class to a logical window for planning.
     */
   private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A window specification without alias.
+  */
+abstract class WindowWithoutAlias {
 
   /**
     * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
@@ -166,10 +179,7 @@ abstract class Window {
     * @param alias alias for this window
     * @return this window
     */
-  def as(alias: Expression): Window = {
-    this.alias = Some(alias)
-    this
-  }
+  def as(alias: Expression): Window
 
   /**
     * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
@@ -182,36 +192,22 @@ abstract class Window {
 }
 
 /**
-  * A window operating on event-time.
-  *
-  * @param timeField defines the time mode for streaming tables. For batch 
table it defines the
-  *                  time attribute on which is grouped.
-  */
-abstract class EventTimeWindow(val timeField: Expression) extends Window
-
-// 
------------------------------------------------------------------------------------------------
-// Tumbling windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * Tumbling window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param size the size of the window either as time or row-count interval.
+  * A predefined specification of window on processing-time
   */
-class TumblingWindow(size: Expression) extends Window {
+abstract class ProcTimeWindowWithoutAlias extends WindowWithoutAlias {
 
   /**
-    * Tumbling window.
+    * Specifies the time attribute on which rows are grouped.
     *
     * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows
     * are grouped by processing-time.
     *
-    * @param size the size of the window either as time or row-count interval.
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param timeField time mode for streaming tables and time attribute for 
batch tables
+    * @return a predefined window on event-time
     */
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
+  def on(timeField: Expression): WindowWithoutAlias
 
   /**
     * Specifies the time attribute on which rows are grouped.
@@ -222,24 +218,85 @@ class TumblingWindow(size: Expression) extends Window {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a tumbling window on event-time
+    * @return a predefined window on event-time
     */
-  def on(timeField: Expression): TumblingEventTimeWindow =
-    new TumblingEventTimeWindow(timeField, size)
+  def on(timeField: String): WindowWithoutAlias =
+    on(ExpressionParser.parseExpression(timeField))
+}
+
+/**
+  * A window operating on event-time.
+  *
+  * For streaming tables call on('rowtime) to specify grouping by event-time.
+  * Otherwise rows are grouped by processing-time.
+  *
+  * For batch tables, refer to a timestamp or long attribute.
+  *
+  * @param timeField time mode for streaming tables and time attribute for 
batch tables
+  */
+abstract class EventTimeWindow(alias: Expression, val timeField: Expression) 
extends Window(alias)
+
+// 
------------------------------------------------------------------------------------------------
+// Tumbling windows
+// 
------------------------------------------------------------------------------------------------
+
+/**
+  * A partial specification of a tumbling window.
+  *
+  * @param size the size of the window either a time or a row-count interval.
+  */
+class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias {
+
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
 
   /**
     * Specifies the time attribute on which rows are grouped.
     *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows
-    * are grouped by processing-time.
+    * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time.
+    * Otherwise rows are grouped by processing-time.
     *
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a tumbling window on event-time
+    * @return a predefined window on event-time
     */
-  def on(timeField: String): TumblingEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
+  override def on(timeField: Expression): WindowWithoutAlias =
+    new TumbleWithoutAlias(timeField, size)
+
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  override def as(alias: Expression) = new TumblingWindow(alias, size)
+}
+
+/**
+  * A tumbling window on event-time without alias.
+  */
+class TumbleWithoutAlias(
+    time: Expression,
+    size: Expression) extends WindowWithoutAlias {
+
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
+    *
+    * @param alias alias for this window
+    * @return this window
+    */
+  override def as(alias: Expression): Window = new 
TumblingEventTimeWindow(alias, time, size)
+}
+
+/**
+  * Tumbling window on processing-time.
+  *
+  * @param alias the alias of the window.
+  * @param size the size of the window either a time or a row-count interval.
+  */
+class TumblingWindow(alias: Expression, size: Expression) extends 
Window(alias) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeTumblingGroupWindow(alias, size)
@@ -249,9 +306,9 @@ class TumblingWindow(size: Expression) extends Window {
   * Tumbling window on event-time.
   */
 class TumblingEventTimeWindow(
+    alias: Expression,
     time: Expression,
-    size: Expression)
-  extends EventTimeWindow(time) {
+    size: Expression) extends EventTimeWindow(alias, time) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     EventTimeTumblingGroupWindow(alias, time, size)
@@ -262,16 +319,16 @@ class TumblingEventTimeWindow(
 // 
------------------------------------------------------------------------------------------------
 
 /**
-  * Partially specified sliding window.
+  * A partially specified sliding window.
   *
-  * @param size the size of the window either as time or row-count interval.
+  * @param size the size of the window either a time or a row-count interval.
   */
 class SlideWithSize(size: Expression) {
 
   /**
-    * Partially specified sliding window.
+    * A partially specified sliding window.
     *
-    * @param size the size of the window either as time or row-count interval.
+    * @param size the size of the window either a time or a row-count interval.
     */
   def this(size: String) = this(ExpressionParser.parseExpression(size))
 
@@ -286,9 +343,9 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count 
interval.
-    * @return a sliding window
+    * @return a predefined sliding window.
     */
-  def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
+  def every(slide: Expression): SlideWithSlide = new SlideWithSlide(size, 
slide)
 
   /**
     * Specifies the window's slide as time or row-count interval.
@@ -301,24 +358,17 @@ class SlideWithSize(size: Expression) {
     * windows.
     *
     * @param slide the slide of the window either as time or row-count 
interval.
-    * @return a sliding window
+    * @return a predefined sliding window.
     */
-  def every(slide: String): SlidingWindow = 
every(ExpressionParser.parseExpression(slide))
+  def every(slide: String): WindowWithoutAlias = 
every(ExpressionParser.parseExpression(slide))
 }
 
 /**
-  * Sliding window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param size the size of the window either as time or row-count interval.
+  * A partially defined sliding window.
   */
-class SlidingWindow(
+class SlideWithSlide(
     size: Expression,
-    slide: Expression)
-  extends Window {
-
+    slide: Expression) extends ProcTimeWindowWithoutAlias {
   /**
     * Specifies the time attribute on which rows are grouped.
     *
@@ -328,37 +378,65 @@ class SlidingWindow(
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a sliding window on event-time
+    * @return a predefined Sliding window on event-time.
     */
-  def on(timeField: Expression): SlidingEventTimeWindow =
-    new SlidingEventTimeWindow(timeField, size, slide)
+  override def on(timeField: Expression): SlideWithoutAlias =
+    new SlideWithoutAlias(timeField, size, slide)
 
   /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows
-    * are grouped by processing-time.
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * @param alias alias for this window
+    * @return this window
+    */
+  override def as(alias: Expression): Window = new SlidingWindow(alias, size, 
slide)
+}
+
+/**
+  * A partially defined sliding window on event-time without alias.
+  */
+class SlideWithoutAlias(
+    timeField: Expression,
+    size: Expression,
+    slide: Expression) extends WindowWithoutAlias {
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
     *
-    * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a sliding window on event-time
+    * @param alias alias for this window
+    * @return this window
     */
-  def on(timeField: String): SlidingEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
+  override def as(alias: Expression): Window =
+    new SlidingEventTimeWindow(alias, timeField, size, slide)
+}
+
+/**
+  * A sliding window on processing-time.
+  *
+  * @param alias the alias of the window.
+  * @param size the size of the window either a time or a row-count interval.
+  * @param slide the interval by which the window slides.
+  */
+class SlidingWindow(
+  alias: Expression,
+  size: Expression,
+  slide: Expression)
+  extends Window(alias) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeSlidingGroupWindow(alias, size, slide)
 }
 
 /**
-  * Sliding window on event-time.
+  * A sliding window on event-time.
   */
 class SlidingEventTimeWindow(
+    alias: Expression,
     timeField: Expression,
     size: Expression,
     slide: Expression)
-  extends EventTimeWindow(timeField) {
+  extends EventTimeWindow(alias, timeField) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     EventTimeSlidingGroupWindow(alias, timeField, size, slide)
@@ -369,23 +447,10 @@ class SlidingEventTimeWindow(
 // 
------------------------------------------------------------------------------------------------
 
 /**
-  * Session window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param gap the time interval of inactivity before a window is closed.
+  * A partially defined session window.
   */
-class SessionWindow(gap: Expression) extends Window {
+class SessionWithGap(gap: Expression) extends ProcTimeWindowWithoutAlias {
 
-  /**
-    * Session window.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * @param gap the time interval of inactivity before a window is closed.
-    */
   def this(gap: String) = this(ExpressionParser.parseExpression(gap))
 
   /**
@@ -397,36 +462,56 @@ class SessionWindow(gap: Expression) extends Window {
     * For batch tables, refer to a timestamp or long attribute.
     *
     * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a session window on event-time
+    * @return an on event-time session window on event-time
     */
-  def on(timeField: Expression): SessionEventTimeWindow =
-    new SessionEventTimeWindow(timeField, gap)
+  override def on(timeField: Expression): SessionWithoutAlias =
+    new SessionWithoutAlias(timeField, gap)
 
   /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by 
event-time. Otherwise rows
-    * are grouped by processing-time.
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
     *
-    * For batch tables, refer to a timestamp or long attribute.
+    * @param alias alias for this window
+    * @return this window
+    */
+  override def as(alias: Expression): Window = new SessionWindow(alias, gap)
+}
+
+/**
+  * A partially defined session window on event-time without alias.
+  */
+class SessionWithoutAlias(
+    timeField: Expression,
+    gap: Expression) extends WindowWithoutAlias {
+  /**
+    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
+    * refer to. `select()` statement can access window properties such as 
window start or end time.
     *
-    * @param timeField time mode for streaming tables and time attribute for 
batch tables
-    * @return a session window on event-time
+    * @param alias alias for this window
+    * @return this window
     */
-  def on(timeField: String): SessionEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
+  override def as(alias: Expression): Window = new 
SessionEventTimeWindow(alias, timeField, gap)
+}
+
+/**
+  * A session window on processing-time.
+  *
+  * @param gap the time interval of inactivity before a window is closed.
+  */
+class SessionWindow(alias: Expression, gap: Expression) extends Window(alias) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     ProcessingTimeSessionGroupWindow(alias, gap)
 }
 
 /**
-  * Session window on event-time.
+  * A session window on event-time.
   */
 class SessionEventTimeWindow(
+    alias: Expression,
     timeField: Expression,
     gap: Expression)
-  extends EventTimeWindow(timeField) {
+  extends EventTimeWindow(alias, timeField) {
 
   override private[flink] def toLogicalWindow: LogicalWindow =
     EventTimeSessionGroupWindow(alias, timeField, gap)

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 1264566..1884e54 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -22,14 +22,13 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.{Expression, WindowReference}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
-abstract class LogicalWindow(val alias: Option[Expression]) extends 
Resolvable[LogicalWindow] {
+abstract class LogicalWindow(val alias: Expression) extends 
Resolvable[LogicalWindow] {
 
   def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow 
= this
 
   def validate(tableEnv: TableEnvironment): ValidationResult = alias match {
-    case Some(WindowReference(_)) => ValidationSuccess
-    case Some(_) => ValidationFailure("Window reference for window expected.")
-    case None => ValidationSuccess
+    case WindowReference(_) => ValidationSuccess
+    case _ => ValidationFailure("Window reference for window expected.")
   }
 
   override def toString: String = getClass.getSimpleName

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index f1f058e..576756d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeIn
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
 abstract class EventTimeGroupWindow(
-    alias: Option[Expression],
+    alias: Expression,
     time: Expression)
   extends LogicalWindow(alias) {
 
@@ -55,7 +55,7 @@ abstract class EventTimeGroupWindow(
   }
 }
 
-abstract class ProcessingTimeGroupWindow(alias: Option[Expression]) extends 
LogicalWindow(alias) {
+abstract class ProcessingTimeGroupWindow(alias: Expression) extends 
LogicalWindow(alias) {
   override def validate(tableEnv: TableEnvironment): ValidationResult = {
     val valid = super.validate(tableEnv)
     if (valid.isFailure) {
@@ -88,13 +88,13 @@ object TumblingGroupWindow {
 }
 
 case class ProcessingTimeTumblingGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     size: Expression)
   extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     ProcessingTimeTumblingGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(size))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
@@ -104,7 +104,7 @@ case class ProcessingTimeTumblingGroupWindow(
 }
 
 case class EventTimeTumblingGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     timeField: Expression,
     size: Expression)
   extends EventTimeGroupWindow(
@@ -113,7 +113,7 @@ case class EventTimeTumblingGroupWindow(
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     EventTimeTumblingGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(timeField),
       resolve(size))
 
@@ -177,14 +177,14 @@ object SlidingGroupWindow {
 }
 
 case class ProcessingTimeSlidingGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     size: Expression,
     slide: Expression)
   extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     ProcessingTimeSlidingGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(size),
       resolve(slide))
 
@@ -195,7 +195,7 @@ case class ProcessingTimeSlidingGroupWindow(
 }
 
 case class EventTimeSlidingGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     timeField: Expression,
     size: Expression,
     slide: Expression)
@@ -203,7 +203,7 @@ case class EventTimeSlidingGroupWindow(
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     EventTimeSlidingGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(timeField),
       resolve(size),
       resolve(slide))
@@ -240,13 +240,13 @@ object SessionGroupWindow {
 }
 
 case class ProcessingTimeSessionGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     gap: Expression)
   extends ProcessingTimeGroupWindow(alias) {
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     ProcessingTimeSessionGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(gap))
 
   override def validate(tableEnv: TableEnvironment): ValidationResult =
@@ -256,7 +256,7 @@ case class ProcessingTimeSessionGroupWindow(
 }
 
 case class EventTimeSessionGroupWindow(
-    override val alias: Option[Expression],
+    override val alias: Expression,
     timeField: Expression,
     gap: Expression)
   extends EventTimeGroupWindow(
@@ -265,7 +265,7 @@ case class EventTimeSessionGroupWindow(
 
   override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
     EventTimeSessionGroupWindow(
-      alias.map(resolve),
+      resolve(alias),
       resolve(timeField),
       resolve(gap))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 559bd75..f1bb644 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -568,7 +568,7 @@ case class WindowAggregate(
     case _ =>
       window.alias match {
         // resolve reference to this window's alias
-        case Some(UnresolvedFieldReference(alias)) if name == alias =>
+        case UnresolvedFieldReference(alias) if name == alias =>
           // check if reference can already be resolved by input fields
           val found = super.resolveReference(tableEnv, name)
           if (found.isDefined) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index c68da04..4da2da9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -53,8 +53,8 @@ class WindowStartEndPropertiesRule
     transformed.push(LogicalWindowAggregate.create(
       agg.getWindow,
       Seq(
-        NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
-        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
+        NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias)),
+        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias))
       ), agg)
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index cd6c77d..9d16536 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -46,7 +46,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("select", "ts, a, b")
         ),
-        term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
7200000.millis)),
+        term("window", EventTimeTumblingGroupWindow('w$, 'ts, 7200000.millis)),
         term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
@@ -75,7 +75,7 @@ class WindowAggregateTest extends TableTestBase {
           "DataSetWindowAggregate",
           batchTableNode(0),
           term("groupBy", "c"),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
240000.millis)),
+          term("window", EventTimeTumblingGroupWindow('w$, 'ts, 
240000.millis)),
           term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
@@ -104,7 +104,7 @@ class WindowAggregateTest extends TableTestBase {
           term("select", "ts, a, b")
         ),
         term("window",
-          EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 
900000.millis)),
+          EventTimeSlidingGroupWindow('w$, 'ts, 5400000.millis, 
900000.millis)),
         term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
@@ -134,7 +134,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("groupBy", "c, d"),
           term("window",
-            EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 
3600000.millis)),
+            EventTimeSlidingGroupWindow('w$, 'ts, 10800000.millis, 
3600000.millis)),
           term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),
@@ -160,7 +160,7 @@ class WindowAggregateTest extends TableTestBase {
           batchTableNode(0),
           term("select", "ts")
         ),
-        term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
1800000.millis)),
+        term("window", EventTimeSessionGroupWindow('w$, 'ts, 1800000.millis)),
         term("select", "COUNT(*) AS cnt")
       )
 
@@ -189,7 +189,7 @@ class WindowAggregateTest extends TableTestBase {
           "DataSetWindowAggregate",
           batchTableNode(0),
           term("groupBy", "c, d"),
-          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
43200000.millis)),
+          term("window", EventTimeSessionGroupWindow('w$, 'ts, 
43200000.millis)),
           term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
             "start('w$) AS w$start, end('w$) AS w$end")
         ),

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 4d0d9aa..6ebfec0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -239,7 +239,7 @@ class FieldProjectionTest extends TableTestBase {
         ),
         term("window",
           EventTimeTumblingGroupWindow(
-            Some(WindowReference("w")),
+           WindowReference("w"),
             RowtimeAttribute(),
             5.millis)),
         term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1")
@@ -268,7 +268,7 @@ class FieldProjectionTest extends TableTestBase {
           term("groupBy", "b"),
           term("window",
             EventTimeTumblingGroupWindow(
-              Some(WindowReference("w")),
+             WindowReference("w"),
               RowtimeAttribute(),
               5.millis)),
           term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index a778b40..8f4eea5 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -97,7 +97,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 
'long, 2.rows)),
+      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 
2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -118,7 +118,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 
'long, 5.milli)),
+      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 
5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -164,7 +164,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 
'long, 5.milli)),
+      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 
5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -188,7 +188,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "int", "long")
       ),
-      term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 
'long, 2.rows)),
+      term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 
2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -236,7 +236,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 
8.milli, 10.milli)),
+        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 
10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -258,7 +258,7 @@ class GroupWindowTest extends TableTestBase {
       batchTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 
1.rows)),
+        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 
1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -294,7 +294,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "long")
       ),
       term("window",
-        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 
8.milli, 10.milli)),
+        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 
10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -319,7 +319,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "long")
       ),
       term("window",
-        EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 
1.rows)),
+        EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 
1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -344,7 +344,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(Some(WindowReference("w")), 
'long, 7.milli)),
+      term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 
7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 324b4d6..578a6a8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -102,7 +102,7 @@ class WindowAggregateTest extends TableTestBase {
             streamTableNode(0),
             term("select", "1970-01-01 00:00:00 AS $f0")
           ),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 
900000.millis)),
+          term("window", EventTimeTumblingGroupWindow('w$, 'rowtime, 
900000.millis)),
           term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) 
AS w$end")
         ),
         term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS 
w$end")
@@ -128,7 +128,7 @@ class WindowAggregateTest extends TableTestBase {
             streamTableNode(0),
             term("select", "1970-01-01 00:00:00 AS $f0")
           ),
-          term("window", ProcessingTimeSlidingGroupWindow(Some('w$),
+          term("window", ProcessingTimeSlidingGroupWindow('w$,
             3600000.millis, 900000.millis)),
           term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) 
AS w$end")
         ),
@@ -156,7 +156,7 @@ class WindowAggregateTest extends TableTestBase {
             streamTableNode(0),
             term("select", "1970-01-01 00:00:00 AS $f0")
           ),
-          term("window", ProcessingTimeSessionGroupWindow(Some('w$), 
900000.millis)),
+          term("window", ProcessingTimeSessionGroupWindow('w$, 900000.millis)),
           term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) 
AS w$end")
         ),
         term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS 
w$end")

http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index fde7682..402ebbe 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -195,7 +195,7 @@ class GroupWindowTest extends TableTestBase {
           term(
             "window",
             ProcessingTimeTumblingGroupWindow(
-              Some(WindowReference("w1")),
+              WindowReference("w1"),
               50.milli)),
           term("select", "string", "COUNT(int) AS TMP_0")
         ),
@@ -204,7 +204,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeSlidingGroupWindow(
-          Some(WindowReference("w2")),
+          WindowReference("w2"),
           20.milli, 10.milli)),
       term("select", "COUNT(string) AS TMP_1")
     )
@@ -232,7 +232,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -261,7 +261,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeTumblingGroupWindow(
-          Some(WindowReference("w")), 2.rows)),
+          WindowReference("w"), 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -289,7 +289,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(),
           5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
@@ -316,7 +316,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -345,7 +345,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           50.milli, 50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -374,7 +374,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -403,7 +403,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 8.milli, 10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -429,7 +429,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 2.rows, 1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -458,7 +458,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSessionGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -487,7 +487,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -515,7 +515,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -543,7 +543,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -572,7 +572,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -601,7 +601,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           50.milli, 50.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -629,7 +629,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         ProcessingTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -657,7 +657,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 8.milli, 10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -686,7 +686,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 2.rows, 1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -714,7 +714,7 @@ class GroupWindowTest extends TableTestBase {
       term(
         "window",
         EventTimeSessionGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(), 7.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -742,7 +742,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term("window",
         EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(),
           5.milli)),
       term("select",
@@ -775,7 +775,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term("window",
         EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
+          WindowReference("w"),
           RowtimeAttribute(),
           10.milli,
           5.milli)),
@@ -811,7 +811,7 @@ class GroupWindowTest extends TableTestBase {
         term("groupBy", "string"),
         term("window",
           EventTimeSessionGroupWindow(
-            Some(WindowReference("w")),
+            WindowReference("w"),
             RowtimeAttribute(),
             3.milli)),
         term("select",
@@ -849,7 +849,7 @@ class GroupWindowTest extends TableTestBase {
         term("groupBy", "string"),
         term("window",
           EventTimeTumblingGroupWindow(
-            Some(WindowReference("w")),
+            WindowReference("w"),
             RowtimeAttribute(),
             5.millis)),
         term("select",

Reply via email to