[FLINK-6228] [table] Add support for OVER windows to streaming Table API.

This closes #3743.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe018921
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe018921
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe018921

Branch: refs/heads/master
Commit: fe018921ed0b24f94ab2139f04293d6074ce4fba
Parents: 1664447
Author: sunjincheng121 <sunjincheng...@gmail.com>
Authored: Thu Apr 13 17:36:18 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Apr 24 18:39:30 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/java/groupWindows.scala     |  84 ---
 .../apache/flink/table/api/java/windows.scala   | 129 ++++
 .../flink/table/api/scala/expressionDsl.scala   |  27 +-
 .../flink/table/api/scala/groupWindows.scala    |  85 ---
 .../apache/flink/table/api/scala/windows.scala  | 126 ++++
 .../org/apache/flink/table/api/table.scala      |  62 ++
 .../org/apache/flink/table/api/windows.scala    | 118 +++-
 .../table/expressions/ExpressionParser.scala    |  64 +-
 .../flink/table/expressions/aggregations.scala  |  36 +-
 .../apache/flink/table/expressions/call.scala   | 211 ++++++-
 .../flink/table/plan/ProjectionTranslator.scala |  51 +-
 .../scala/stream/table/OverWindowITCase.scala   | 331 ++++++++++
 .../api/scala/stream/table/OverWindowTest.scala | 596 +++++++++++++++++++
 .../OverWindowStringExpressionTest.scala        | 151 +++++
 14 files changed, 1880 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
deleted file mode 100644
index 9c82e9b..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java
-
-import org.apache.flink.table.api.{SessionWindow, SlideWithSize, 
TumblingWindow}
-
-/**
-  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-  * elements in 5 minutes intervals.
-  */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are consecutive, 
non-overlapping
-    * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
-    */
-  def over(size: String): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
-  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
-  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
-  * window evaluations.
-  */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive window evaluations.
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: String): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
-  * Helper class for creating a session window. The boundary of session 
windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-  * gap period.
-  */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a session window
-    */
-  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
new file mode 100644
index 0000000..1e9d65a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java
+
+import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, 
SlideWithSize, TumblingWindow}
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+
+/**
+  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are consecutive, 
non-overlapping
+    * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: String): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
+  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
+    * consecutive window evaluations.
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: String): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper class for creating a session window. The boundary of session 
windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
+}
+
+/**
+  * Helper object for creating a over window.
+  */
+object Over {
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    */
+  def orderBy(orderBy: String): OverWindowPredefined = {
+    val orderByExpr = ExpressionParser.parseExpression(orderBy)
+    new OverWindowPredefined(Seq[Expression](), orderByExpr)
+  }
+
+  /**
+    * Partitions the elements on some partition keys.
+    *
+    * @param partitionBy some partition keys.
+    * @return A partitionedOver instance that only contains the orderBy method.
+    */
+  def partitionBy(partitionBy: String): PartitionedOver = {
+    val partitionByExpr = 
ExpressionParser.parseExpressionList(partitionBy).toArray
+    new PartitionedOver(partitionByExpr)
+  }
+}
+
+class PartitionedOver(private val partitionByExpr: Array[Expression]) {
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    */
+  def orderBy(orderBy: String): OverWindowPredefined = {
+    val orderByExpr = ExpressionParser.parseExpression(orderBy)
+    new OverWindowPredefined(partitionByExpr, orderByExpr)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 06d46e3..fc6cb10 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -17,14 +17,15 @@
  */
 package org.apache.flink.table.api.scala
 
+import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
 import org.apache.calcite.avatica.util.DateTimeUtils._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, 
UnboundedRow, UnboundedRange}
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, 
toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions._
-import java.math.{BigDecimal => JBigDecimal}
 
 import scala.language.implicitConversions
 
@@ -364,6 +365,23 @@ trait ImplicitExpressionOperations {
   def position(haystack: Expression) = Position(expr, haystack)
 
   /**
+    * For windowing function to config over window
+    * e.g.:
+    * table
+    *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows 
following CURRENT_ROW as 'w)
+    *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+    */
+  def over(alias: Expression) = {
+    expr match {
+      case _: Aggregation => UnresolvedOverCall(
+        expr.asInstanceOf[Aggregation],
+        alias)
+      case _ => throw new TableException(
+        "The over method can only using with aggregation expression.")
+    }
+  }
+
+  /**
     * Replaces a substring of string with a string starting at a position 
(starting at 1).
     *
     * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
@@ -586,6 +604,13 @@ trait ImplicitExpressionOperations {
  * to [[ImplicitExpressionOperations]].
  */
 trait ImplicitExpressionConversions {
+
+  implicit val UNBOUNDED_ROW = UnboundedRow()
+  implicit val UNBOUNDED_RANGE = UnboundedRange()
+
+  implicit val CURRENT_ROW = CurrentRow()
+  implicit val CURRENT_RANGE = CurrentRange()
+
   implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
     def expr = e
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
deleted file mode 100644
index 0e4c1c7..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/groupWindows.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala
-
-import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.api.{SessionWindow, SlideWithSize, 
TumblingWindow}
-
-/**
-  * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-  * elements in 5 minutes intervals.
-  */
-object Tumble {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, 
non-overlapping
-    * windows. For example, a tumbling window of 5 minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a tumbling window
-    */
-  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
-}
-
-/**
-  * Helper object for creating a sliding window. Sliding windows have a fixed 
size and slide by
-  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
-  * window evaluations.
-  */
-object Slide {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
-  * Helper object for creating a session window. The boundary of session 
windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-  * gap period.
-  */
-object Session {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a session window
-    */
-  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
new file mode 100644
index 0000000..1230027
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, 
SlideWithSize, TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, 
non-overlapping
+    * windows. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: Expression): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper object for creating a sliding window. Sliding windows have a fixed 
size and slide by
+  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
+    * consecutive
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper object for creating a session window. The boundary of session 
windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
+}
+
+/**
+  * Helper object for creating a over window.
+  */
+object Over {
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    */
+  def orderBy(orderBy: Expression): OverWindowPredefined = {
+    new OverWindowPredefined(Seq[Expression](), orderBy)
+  }
+
+  /**
+    * Partitions the elements on some partition keys.
+    *
+    * @param partitionBy some partition keys.
+    * @return A partitionedOver instance that only contains the orderBy method.
+    */
+  def partitionBy(partitionBy: Expression*): PartitionedOver = {
+    PartitionedOver(partitionBy.toArray)
+  }
+}
+
+case class PartitionedOver(partitionBy: Array[Expression]) {
+
+  /**
+    * Specifies the time attribute on which rows are grouped.
+    *
+    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to 
specify time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    */
+  def orderBy(orderBy: Expression): OverWindowPredefined = {
+    new OverWindowPredefined(partitionBy, orderBy)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 769e940..c688aab 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.sinks.TableSink
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.annotation.varargs
 
 /**
   * A Table is the core component of the Table API.
@@ -810,6 +811,44 @@ class Table(
     new WindowedTable(this, window)
   }
 
+  /**
+    * Defines over-windows on the records of a table.
+    *
+    * An over-window defines for each record an interval of records over which 
aggregation
+    * functions can be computed.
+    *
+    * Example:
+    *
+    * {{{
+    *   table
+    *     .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 
'ow)
+    *     .select('c, 'b.count over 'ow, 'e.sum over 'ow)
+    * }}}
+    *
+    * __Note__: Computing over window aggregates on a streaming table is only 
a parallel operation
+    * if the window is partitioned. Otherwise, the whole stream will be 
processed by a single
+    * task, i.e., with parallelism 1.
+    *
+    * __Note__: Over-windows for batch tables are currently not supported.
+    *
+    * @param overWindows windows that specify the record interval over which 
aggregations are
+    *                    computed.
+    * @return An OverWindowedTable to specify the aggregations.
+    */
+  @varargs
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw TableException("Over-windows for batch tables are currently not 
supported..")
+    }
+
+    if (overWindows.size != 1) {
+      throw TableException("Over-Windows are currently only supported single 
window.")
+    }
+
+    new OverWindowedTable(this, overWindows.toArray)
+  }
+
   var tableName: String = _
 
   /**
@@ -928,6 +967,29 @@ class WindowedTable(
 
 }
 
+class OverWindowedTable(
+    private[flink] val table: Table,
+    private[flink] val overWindows: Array[OverWindow]) {
+
+  def select(fields: Expression*): Table = {
+    val expandedFields = expandProjectList(
+      fields,
+      table.logicalPlan,
+      table.tableEnv)
+
+    val expandedOverFields = resolveOverWindows(expandedFields, overWindows, 
table.tableEnv)
+
+    new Table(
+      table.tableEnv,
+      Project(expandedOverFields.map(UnresolvedAlias), 
table.logicalPlan).validate(table.tableEnv))
+  }
+
+  def select(fields: String): Table = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+}
+
 class WindowGroupedTable(
     private[flink] val table: Table,
     private[flink] val groupKeys: Seq[Expression],

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 5ba6934..577d77c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -18,8 +18,124 @@
 
 package org.apache.flink.table.api
 
-import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
+import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW}
+
+/**
+  * Over window is similar to the traditional OVER SQL.
+  */
+case class OverWindow(
+    private[flink] val alias: Expression,
+    private[flink] val partitionBy: Seq[Expression],
+    private[flink] val orderBy: Expression,
+    private[flink] val preceding: Expression,
+    private[flink] val following: Expression)
+
+case class CurrentRow() extends Expression {
+  override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+  override private[flink] def children = Seq()
+  override def toString = "CURRENT ROW"
+}
+
+case class CurrentRange() extends Expression {
+  override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  override private[flink] def children = Seq()
+  override def toString = "CURRENT RANGE"
+}
+
+case class UnboundedRow() extends Expression {
+  override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+  override private[flink] def children = Seq()
+  override def toString = "UNBOUNDED ROW"
+}
+
+case class UnboundedRange() extends Expression {
+  override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
+  override private[flink] def children = Seq()
+  override def toString = "UNBOUNDED RANGE"
+}
+
+/**
+  * An over window predefined  specification.
+  */
+class OverWindowPredefined(
+    private val partitionBy: Seq[Expression],
+    private val orderBy: Expression) {
+
+  private[flink] var preceding: Expression = _
+  private[flink] var following: Expression = _
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: Expression): OverWindow = {
+
+    // set following to CURRENT_ROW / CURRENT_RANGE if not defined
+    if (null == following) {
+      if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
+        following = CURRENT_ROW
+      } else {
+        following = CURRENT_RANGE
+      }
+    }
+    OverWindow(alias, partitionBy, orderBy, preceding, following)
+  }
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return this over window
+    */
+  def preceding(preceding: String): OverWindowPredefined = {
+    this.preceding(ExpressionParser.parseExpression(preceding))
+  }
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return this over window
+    */
+  def preceding(preceding: Expression): OverWindowPredefined = {
+    this.preceding = preceding
+    this
+  }
+
+  /**
+    * Set the following offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param following following offset that relative to the current row.
+    * @return this over window
+    */
+  def following(following: String): OverWindowPredefined = {
+    this.following(ExpressionParser.parseExpression(following))
+  }
+
+  /**
+    * Set the following offset (based on time or row-count intervals) for over 
window.
+    *
+    * @param following following offset that relative to the current row.
+    * @return this over window
+    */
+  def following(following: Expression): OverWindowPredefined = {
+    this.following = following
+    this
+  }
+}
 
 /**
   * A window specification.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index ed0b16e..1985248 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.expressions
 
 import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, 
MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
-import org.apache.flink.table.api.ExpressionParserException
+import org.apache.flink.table.api.{ExpressionParserException, CurrentRow, 
CurrentRange, UnboundedRow, UnboundedRange}
 import org.apache.flink.table.expressions.ExpressionUtils.{toMilliInterval, 
toMonthInterval}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
@@ -87,11 +87,17 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val STAR: Keyword = Keyword("*")
   lazy val GET: Keyword = Keyword("get")
   lazy val FLATTEN: Keyword = Keyword("flatten")
+  lazy val OVER: Keyword = Keyword("over")
+  lazy val CURRENT_ROW: Keyword = Keyword("current_row")
+  lazy val CURRENT_RANGE: Keyword = Keyword("current_range")
+  lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
+  lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
-      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
-      not(IF) ~> super.ident
+      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
+      not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ 
not(UNBOUNDED_RANGE) ~>
+      super.ident
 
   // symbols
 
@@ -166,8 +172,25 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     dt => Null(dt)
   }
 
+  // OVER constants
+  lazy val currentRange: PackratParser[Expression] = CURRENT_RANGE ^^ {
+    _ => CurrentRange()
+  }
+  lazy val currentRow: PackratParser[Expression] = CURRENT_ROW ^^ {
+    _ => CurrentRow()
+  }
+  lazy val unboundedRange: PackratParser[Expression] = UNBOUNDED_RANGE ^^ {
+    _ => UnboundedRange()
+  }
+  lazy val unboundedRow: PackratParser[Expression] = UNBOUNDED_ROW ^^ {
+    _ => UnboundedRow()
+  }
+  lazy val overConstant: PackratParser[Expression] =
+    currentRange | currentRow | unboundedRange | unboundedRow
+
   lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | 
boolLiteral | nullLiteral
+    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | 
boolLiteral | nullLiteral |
+      overConstant
 
   lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
     sym => UnresolvedFieldReference(sym)
@@ -289,12 +312,14 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val suffixFlattening: PackratParser[Expression] =
     composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
 
+  lazy val suffixAgg: PackratParser[Expression] =
+    suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg
+
   lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax 
| suffixStart |
-      suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim 
|
-      suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate 
|
-      suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | 
suffixCeil |
-      suffixGet | suffixFlattening |
+    suffixTimeInterval | suffixRowInterval | suffixStart | suffixEnd | 
suffixAgg |
+      suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | 
suffixAsc |
+      suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | 
suffixExtract |
+      suffixFloor | suffixCeil | suffixGet | suffixFlattening |
       suffixFunctionCall | suffixFunctionCallOneArg // function call must 
always be at the end
 
   // prefix operators
@@ -375,15 +400,26 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val prefixFlattening: PackratParser[Expression] =
     FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
 
+  lazy val prefixAgg: PackratParser[Expression] =
+    prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg
+
   lazy val prefixed: PackratParser[Expression] =
-    prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
-      prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim | 
prefixTrimWithoutArgs |
-      prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet | 
prefixFlattening |
-      prefixFunctionCall | prefixFunctionCallOneArg // function call must 
always be at the end
+    prefixArray | prefixAgg | prefixStart | prefixEnd | prefixCast | prefixAs 
| prefixTrim |
+      prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | 
prefixCeil | prefixGet |
+      prefixFlattening | prefixFunctionCall |
+      prefixFunctionCallOneArg // function call must always be at the end
+
+  // over
+
+  lazy val over: PackratParser[Expression] = suffixAgg ~ OVER ~ fieldReference 
^^ {
+    case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef)
+  } | prefixAgg ~ OVER ~ fieldReference ^^ {
+    case agg ~ _ ~ windowRef => UnresolvedOverCall(agg, windowRef)
+  }
 
   // suffix/prefix composite
 
-  lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
+  lazy val composite: PackratParser[Expression] = suffixed | over | prefixed | 
atom |
     failure("Composite expression expected.")
 
   // unary ops

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index b2fca88..4ef5209 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -18,11 +18,14 @@
 package org.apache.flink.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.fun._
+import org.apache.calcite.sql.SqlKind._
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeFactory
 
 abstract sealed class Aggregation extends UnaryExpression {
 
@@ -35,6 +38,12 @@ abstract sealed class Aggregation extends UnaryExpression {
     * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
     */
   private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall
+
+  /**
+    * Returns the SqlAggFunction for this Aggregation.
+    */
+  private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder): 
SqlAggFunction
+
 }
 
 case class Sum(child: Expression) extends Aggregation {
@@ -48,6 +57,13 @@ case class Sum(child: Expression) extends Aggregation {
 
   override private[flink] def validateInput() =
     TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
+    val returnType = relBuilder
+      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(resultType)
+    new SqlSumAggFunction(returnType)
+  }
 }
 
 case class Min(child: Expression) extends Aggregation {
@@ -61,6 +77,10 @@ case class Min(child: Expression) extends Aggregation {
 
   override private[flink] def validateInput() =
     TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
+    new SqlMinMaxAggFunction(MIN)
+  }
 }
 
 case class Max(child: Expression) extends Aggregation {
@@ -74,6 +94,10 @@ case class Max(child: Expression) extends Aggregation {
 
   override private[flink] def validateInput() =
     TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
+    new SqlMinMaxAggFunction(MAX)
+  }
 }
 
 case class Count(child: Expression) extends Aggregation {
@@ -84,6 +108,10 @@ case class Count(child: Expression) extends Aggregation {
   }
 
   override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
+    new SqlCountAggFunction()
+  }
 }
 
 case class Avg(child: Expression) extends Aggregation {
@@ -97,4 +125,8 @@ case class Avg(child: Expression) extends Aggregation {
 
   override private[flink] def validateInput() =
     TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
+
+  override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
+    new SqlAvgAggFunction(AVG)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index 40db13e..68ed688 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -17,15 +17,25 @@
  */
 package org.apache.flink.table.expressions
 
-import org.apache.calcite.rex.RexNode
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rex.RexWindowBound._
+import org.apache.calcite.rex.{RexFieldCollation, RexNode, RexWindowBound}
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
+import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.functions._
 import org.apache.flink.table.plan.logical.{LogicalNode, 
LogicalTableFunctionCall}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
+
+import _root_.scala.collection.JavaConverters._
 
 /**
   * General expression for unresolved function calls. The function can be a 
built-in
@@ -49,6 +59,201 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over call with unresolved alias for over window.
+  *
+  * @param agg The aggregation of the over call.
+  * @param alias The alias of the referenced over window.
+  */
+case class UnresolvedOverCall(agg: Expression, alias: Expression) extends 
Expression {
+
+  override private[flink] def validateInput() =
+    ValidationFailure("Over window with alias $alias could not be resolved.")
+
+  override private[flink] def resultType = agg.resultType
+
+  override private[flink] def children = Seq()
+}
+
+/**
+  * Over expression for Calcite over transform.
+  *
+  * @param agg            over-agg expression
+  * @param partitionBy    The fields by which the over window is partitioned
+  * @param orderBy        The field by which the over window is sorted
+  * @param preceding      The lower bound of the window
+  * @param following      The upper bound of the window
+  */
+case class OverCall(
+    agg: Expression,
+    partitionBy: Seq[Expression],
+    orderBy: Expression,
+    preceding: Expression,
+    following: Expression) extends Expression {
+
+  override def toString: String = s"$agg OVER (" +
+    s"PARTITION BY (${partitionBy.mkString(", ")}) " +
+    s"ORDER BY $orderBy " +
+    s"PRECEDING $preceding " +
+    s"FOLLOWING $following)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+    val rexBuilder = relBuilder.getRexBuilder
+
+    // assemble aggregation
+    val operator: SqlAggFunction = 
agg.asInstanceOf[Aggregation].getSqlAggFunction()
+    val aggResultType = relBuilder
+      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(agg.resultType)
+
+    val aggChildName = 
agg.asInstanceOf[Aggregation].child.asInstanceOf[ResolvedFieldReference].name
+    val aggExprs = 
List(relBuilder.field(aggChildName).asInstanceOf[RexNode]).asJava
+
+    // assemble order by key
+    val orderKey = orderBy match {
+      case _: RowTime =>
+        new RexFieldCollation(relBuilder.call(EventTimeExtractor), 
Set[SqlKind]().asJava)
+      case _: ProcTime =>
+        new RexFieldCollation(relBuilder.call(ProcTimeExtractor), 
Set[SqlKind]().asJava)
+      case _ =>
+        throw new ValidationException("Invalid OrderBy expression.")
+    }
+    val orderKeys = ImmutableList.of(orderKey)
+
+    // assemble partition by keys
+    val partitionKeys = partitionBy.map(_.toRexNode(relBuilder)).asJava
+
+    // assemble bounds
+    val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
+    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
+
+    // build RexOver
+    rexBuilder.makeOver(
+      aggResultType,
+      operator,
+      aggExprs,
+      partitionKeys,
+      orderKeys,
+      lowerBound,
+      upperBound,
+      isPhysical,
+      true,
+      false)
+  }
+
+  private def createBound(
+    relBuilder: RelBuilder,
+    bound: Expression,
+    sqlKind: SqlKind): RexWindowBound = {
+
+    bound match {
+      case _: UnboundedRow | _: UnboundedRange =>
+        val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+        create(unbounded, null)
+      case _: CurrentRow | _: CurrentRange =>
+        val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+        create(currentRow, null)
+      case b: Literal =>
+        val returnType = relBuilder
+          .getTypeFactory.asInstanceOf[FlinkTypeFactory]
+          .createTypeFromTypeInfo(Types.DECIMAL)
+
+        val sqlOperator = new SqlPostfixOperator(
+          sqlKind.name,
+          sqlKind,
+          2,
+          new OrdinalReturnTypeInference(0),
+          null,
+          null)
+
+        val operands: Array[SqlNode] = new Array[SqlNode](1)
+        operands(0) = SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)
+
+        val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+        val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+        expressions.add(relBuilder.literal(b.value))
+
+        val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
+
+        create(node, rexNode)
+    }
+  }
+
+  override private[flink] def children: Seq[Expression] =
+    Seq(agg) ++ Seq(orderBy) ++ partitionBy ++ Seq(preceding) ++ Seq(following)
+
+  override private[flink] def resultType = agg.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+
+    // check that agg expression is aggregation
+    agg match {
+      case _: Aggregation =>
+        ValidationSuccess
+      case _ =>
+        return ValidationFailure(s"OVER can only be applied on an 
aggregation.")
+    }
+
+    // check partitionBy expression keys are resolved field reference
+    partitionBy.foreach {
+      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+        ValidationSuccess
+      case r: ResolvedFieldReference =>
+        return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+          s"Expression must return key type.")
+      case r =>
+        return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
+          s"Expression must be a resolved field reference.")
+    }
+
+    // check preceding is valid
+    preceding match {
+      case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: 
UnboundedRange =>
+        ValidationSuccess
+      case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+        ValidationSuccess
+      case Literal(_, _: RowIntervalTypeInfo) =>
+        return ValidationFailure("Preceding row interval must be larger than 
0.")
+      case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+        ValidationSuccess
+      case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+        return ValidationFailure("Preceding time interval must be equal or 
larger than 0.")
+      case Literal(_, _) =>
+        return ValidationFailure("Preceding must be a row interval or time 
interval literal.")
+    }
+
+    // check following is valid
+    following match {
+      case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: 
UnboundedRange =>
+        ValidationSuccess
+      case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+        ValidationSuccess
+      case Literal(_, _: RowIntervalTypeInfo) =>
+        return ValidationFailure("Following row interval must be larger than 
0.")
+      case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
+        ValidationSuccess
+      case Literal(_, _: TimeIntervalTypeInfo[_]) =>
+        return ValidationFailure("Following time interval must be equal or 
larger than 0.")
+      case Literal(_, _) =>
+        return ValidationFailure("Following must be a row interval or time 
interval literal.")
+    }
+
+    // check that preceding and following are of same type
+    (preceding, following) match {
+      case (p: Expression, f: Expression) if p.resultType == f.resultType =>
+        ValidationSuccess
+      case _ =>
+        return ValidationFailure("Preceding and following must be of same 
interval type.")
+    }
+
+    ValidationSuccess
+  }
+}
+
+/**
   * Expression for calling a user-defined scalar functions.
   *
   * @param scalarFunction scalar function to be called (might be overloaded)

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 94a0aa1..0d45a37 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.table.plan
 
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{OverWindow, StreamTableEnvironment, 
TableEnvironment}
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.{ProcTime, RowTime}
 import org.apache.flink.table.plan.logical.{LogicalNode, Project}
 
 import scala.collection.mutable
@@ -221,6 +222,54 @@ object ProjectionTranslator {
     projectList
   }
 
+  def resolveOverWindows(
+      exprs: Seq[Expression],
+      overWindows: Array[OverWindow],
+      tEnv: TableEnvironment): Seq[Expression] = {
+
+    def resolveOverWindow(unresolvedCall: UnresolvedOverCall): Expression = {
+
+      val overWindow = overWindows.find(_.alias.equals(unresolvedCall.alias))
+      if (overWindow.isDefined) {
+        if (tEnv.isInstanceOf[StreamTableEnvironment]) {
+          val timeIndicator = overWindow.get.orderBy match {
+            case u: UnresolvedFieldReference if u.name.toLowerCase == 
"rowtime" =>
+              RowTime()
+            case u: UnresolvedFieldReference if u.name.toLowerCase == 
"proctime" =>
+              ProcTime()
+            case e: Expression => e
+          }
+          OverCall(
+            unresolvedCall.agg,
+            overWindow.get.partitionBy,
+            timeIndicator,
+            overWindow.get.preceding,
+            overWindow.get.following)
+        } else {
+          OverCall(
+            unresolvedCall.agg,
+            overWindow.get.partitionBy,
+            overWindow.get.orderBy,
+            overWindow.get.preceding,
+            overWindow.get.following)
+        }
+      } else {
+        unresolvedCall
+      }
+    }
+
+    val projectList = new ListBuffer[Expression]
+    exprs.foreach {
+      case Alias(u: UnresolvedOverCall, name, _) =>
+        projectList += Alias(resolveOverWindow(u), name)
+      case u: UnresolvedOverCall =>
+        projectList += resolveOverWindow(u)
+      case e: Expression => projectList += e
+    }
+    projectList
+  }
+
+
   /**
     * Extract all field references from the given expressions.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
new file mode 100644
index 0000000..8464ba1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.table.OverWindowITCase.RowTimeSourceFunction
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
+
+    val data = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (3L, 3, "Hello"),
+      (4L, 4, "Hello"),
+      (5L, 5, "Hello"),
+      (6L, 6, "Hello"),
+      (7L, 7, "Hello World"),
+      (8L, 8, "Hello World"),
+      (20L, 20, "Hello World"))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c)
+
+    val windowedTable = table
+      .window(
+        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select('c, 'b.count over 'w as 'mycount)
+      .select('c, 'mycount)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello World,1", "Hello World,2", "Hello World,3",
+      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+    val table = env
+      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv).as('a, 'b, 'c)
+
+    val windowedTable = table
+      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE 
following
+         CURRENT_RANGE as 'w)
+      .select(
+        'a, 'b, 'c,
+        'b.sum over 'w,
+        'b.count over 'w,
+        'b.avg over 'w,
+        'b.max over 'w,
+        'b.min over 'w)
+
+    val result = windowedTable.toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hello,6,3,2,3,1",
+      "1,2,Hello,6,3,2,3,1",
+      "1,3,Hello world,6,3,2,3,1",
+      "1,1,Hi,7,4,1,3,1",
+      "2,1,Hello,1,1,1,1,1",
+      "2,2,Hello world,6,3,2,3,1",
+      "2,3,Hello world,6,3,2,3,1",
+      "1,4,Hello world,11,5,2,4,1",
+      "1,5,Hello world,29,8,3,7,1",
+      "1,6,Hello world,29,8,3,7,1",
+      "1,7,Hello world,29,8,3,7,1",
+      "2,4,Hello world,15,5,3,5,1",
+      "2,5,Hello world,15,5,3,5,1"
+    )
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRangeOver(): Unit = {
+
+    val data = List(
+      (1, 1L, 0, "Hallo", 1L),
+      (2, 2L, 1, "Hallo Welt", 2L),
+      (2, 3L, 2, "Hallo Welt wie", 1L),
+      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
+      (3, 5L, 4, "ABC", 2L),
+      (3, 6L, 5, "BCD", 3L),
+      (4, 7L, 6, "CDE", 2L),
+      (4, 8L, 7, "DEF", 1L),
+      (4, 9L, 8, "EFG", 1L),
+      (4, 10L, 9, "FGH", 2L),
+      (5, 11L, 10, "GHI", 1L),
+      (5, 12L, 11, "HIJ", 3L),
+      (5, 13L, 12, "IJK", 3L),
+      (5, 14L, 13, "JKL", 2L),
+      (5, 15L, 14, "KLM", 2L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(1)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+
+    val windowedTable = table
+      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following 
CURRENT_ROW as 'w)
+      .select('a, 'c.sum over 'w, 'c.min over 'w)
+    val result = windowedTable.toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0",
+      "2,1,1",
+      "2,3,1",
+      "3,3,3",
+      "3,7,3",
+      "3,12,3",
+      "4,6,6",
+      "4,13,6",
+      "4,21,6",
+      "4,30,6",
+      "5,10,10",
+      "5,21,10",
+      "5,33,10",
+      "5,46,10",
+      "5,60,10")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowOver(): Unit = {
+    val data = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val table = env.addSource[(Long, Int, String)](
+      new RowTimeSourceFunction[(Long, Int, 
String)](data)).toTable(tEnv).as('a, 'b, 'c)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following 
CURRENT_ROW as 'w)
+      .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+
+    val result = windowedTable.toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+      "Hello,6,3,15",
+      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val table = env.addSource[(Long, Int, String)](
+      new RowTimeSourceFunction[(Long, Int, 
String)](data)).toTable(tEnv).as('a, 'b, 'c)
+
+    val windowedTable = table
+      .window(
+        Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following 
CURRENT_RANGE as 'w)
+      .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
+
+    val result = windowedTable.toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+      "Hello,3,4,9",
+      "Hello,4,2,7",
+      "Hello,5,2,9",
+      "Hello,6,2,11", "Hello,65,2,12",
+      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", 
"Hello World,18,1,7",
+      "Hello World,8,2,15",
+      "Hello World,20,1,20")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object OverWindowITCase {
+
+  class RowTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends 
SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe018921/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
new file mode 100644
index 0000000..26f4ead
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 
'b, 'c)
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'x)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testOrderBy(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingAndFollowingUsingIsLiteral(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" 
as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingAndFollowingUsingSameType(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following 
CURRENT_RANGE as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPartitionByWithUnresolved(): Unit = {
+    val result = table
+      .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPartitionByWithNotKeyType(): Unit = {
+    val table2 = streamUtil.addTable[(Int, String, Either[Long, 
String])]("MyTable2", 'a, 'b, 'c)
+
+    val result = table2
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testPrecedingValue(): Unit = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding -1.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFollowingValue(): Unit = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
+      .select('c, 'b.count over 'w)
+    streamUtil.tEnv.optimize(result.getRelNode)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver() = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'proctime preceding 2.rows following 
CURRENT_ROW as 'w)
+      .select('c, 'b.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "b", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "b", "c", "PROCTIME", "COUNT(b) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over partitionBy 'a orderBy 'proctime preceding 2.hours following 
CURRENT_RANGE as 'w)
+      .select('a, 'c.avg over 'w as 'myAvg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "PROCTIME",
+            "AVG(c) AS w0$o0"
+          )
+        ),
+        term("select", "a", "w0$o0 AS myAvg")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding 10.second as 'w)
+      .select('a, 'c.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeBoundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding 2.rows as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRangeOver() = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE 
following
+         CURRENT_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "PROCTIME",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedPartitionedRowsOver() = {
+    val result = table
+      .window(
+        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW 
following CURRENT_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "PROCTIME",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProcTimeUnboundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "PROCTIME() AS $2")
+          ),
+          term("orderBy", "PROCTIME"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowsOver() = {
+    val result = table
+      .window(
+        Over partitionBy 'c orderBy 'rowtime preceding 2.rows following 
CURRENT_ROW as 'w)
+      .select('c, 'b.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "b", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "b", "c", "ROWTIME", "COUNT(b) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over partitionBy 'a orderBy 'rowtime preceding 2.hours following 
CURRENT_RANGE as 'w)
+      .select('a, 'c.avg over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "a"),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "ROWTIME",
+            "AVG(c) AS w0$o0"
+          )
+        ),
+        term("select", "a", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 10.second as 'w)
+      .select('a, 'c.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(c) AS w0$o0")
+        ),
+        term("select", "a", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeBoundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding 2.rows as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRangeOver() = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE 
following
+         CURRENT_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "ROWTIME",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedPartitionedRowsOver() = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW 
following
+         CURRENT_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("partitionBy", "c"),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRangeOver() = {
+    val result = table
+      .window(
+        Over orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+      .select('a, 'c, 'a.count over 'w, 'a.sum over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term(
+            "select",
+            "a",
+            "c",
+            "ROWTIME",
+            "COUNT(a) AS w0$o0",
+            "SUM(a) AS w0$o1"
+          )
+        ),
+        term(
+          "select",
+          "a",
+          "c",
+          "w0$o0 AS _c2",
+          "w0$o1 AS _c3"
+        )
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+  @Test
+  def testRowTimeUnboundedNonPartitionedRowsOver() = {
+    val result = table
+      .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
+      .select('c, 'a.count over 'w)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamOverAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c", "ROWTIME() AS $2")
+          ),
+          term("orderBy", "ROWTIME"),
+          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+        ),
+        term("select", "c", "w0$o0 AS _c1")
+      )
+
+    streamUtil.verifyTable(result, expected)
+  }
+
+}
+
+object OverWindowTest{
+  case class Pojo(id: Long, name: String)
+}

Reply via email to