[FLINK-6261] [table] Support TUMBLE, HOP, SESSION group window functions for 
SQL queries on batch tables.

- Drop support for group window translation of "GROUP BY FLOOR/CEIL".

This closes #3675.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63539475
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63539475
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63539475

Branch: refs/heads/table-retraction
Commit: 635394751dce6e532fcd5a758c3d1bdb25303712
Parents: e2a4f47
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Apr 4 15:19:25 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Apr 6 22:37:45 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  90 +++++--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   7 +-
 .../common/LogicalWindowAggregateRule.scala     | 144 +++++++++++
 .../DataSetLogicalWindowAggregateRule.scala     |  92 +++++++
 .../DataStreamLogicalWindowAggregateRule.scala  | 112 +++++++++
 .../datastream/LogicalWindowAggregateRule.scala | 222 -----------------
 .../scala/batch/sql/AggregationsITCase.scala    |  92 ++++++-
 .../scala/batch/sql/WindowAggregateTest.scala   | 244 +++++++++++++++++++
 .../scala/stream/sql/WindowAggregateTest.scala  |  86 -------
 9 files changed, 762 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 7156393..6f96920 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1418,45 +1418,103 @@ val result2 = tableEnv.sql(
 </div>
 </div>
 
-#### Group windows
+#### Limitations
+
+Joins, set operations, and non-windowed aggregations are not supported yet.
+
+{% top %}
+
+### Group Windows
 
-Streaming SQL supports aggregation on group windows by specifying the windows 
in the `GROUP BY` clause. The following table describes the syntax of the group 
windows:
+Group windows are defined in the `GROUP BY` clause of a SQL query. Just like 
queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that 
includes a group window function compute a single result row per group. The 
following group windows functions are supported for SQL on batch and streaming 
tables.
 
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th><code>GROUP BY</code> clause</th>
+      <th class="text-left" style="width: 30%">Group Window Function</th>
       <th class="text-left">Description</th>
     </tr>
   </thead>
 
   <tbody>
     <tr>
-      <td><code>TUMBLE(mode, interval)</code></td>
-      <td>A tumbling window over the time period specified by 
<code>interval</code>.</td>
+      <td><code>TUMBLE(time_attr, interval)</code></td>
+      <td>Defines are tumbling time window. A tumbling time window assigns 
rows to non-overlapping, continuous windows with a fixed duration 
(<code>interval</code>). For example, a tumbling window of 5 minutes groups 
rows in 5 minutes intervals. Tumbling windows can be defined on event-time 
(stream + batch) or processing-time (stream).</td>
     </tr>
     <tr>
-      <td><code>HOP(mode, slide, size)</code></td>
-      <td>A sliding window with the length of <code>size</code> and moves 
every <code>slide</code>.</td>
+      <td><code>HOP(time_attr, interval, interval)</code></td>
+      <td>Defines a hopping time window (called sliding window in the Table 
API). A hopping time window has a fixed duration (second <code>interval</code> 
parameter) and hops by a specified hop interval (first <code>interval</code> 
parameter). If the hop interval is smaller than the window size, hopping 
windows are overlapping. Thus, rows can be assigned to multiple windows. For 
example, a hopping window of 15 minutes size and 5 minute hop interval assigns 
each row to 3 different windows of 15 minute size, which are evaluated in an 
interval of 5 minutes. Hopping windows can be defined on event-time (stream + 
batch) or processing-time (stream).</td>
     </tr>
     <tr>
-      <td><code>SESSION(mode, gap)</code></td>
-      <td>A session window that has <code>gap</code> as the gap between two 
windows.</td>
+      <td><code>SESSION(time_attr, interval)</code></td>
+      <td>Defines a session time window. Session time windows do not have a 
fixed duration but their bounds are defined by a time <code>interval</code> of 
inactivity, i.e., a session window is closed if no event appears for a defined 
gap period. For example a session window with a 30 minute gap starts when a row 
is observed after 30 minutes inactivity (otherwise the row would be added to an 
existing window) and is closed if no row is added within 30 minutes. Session 
windows can work on event-time (stream + batch) or processing-time 
(stream).</td>
     </tr>
   </tbody>
 </table>
 
-The parameters `interval`, `slide`, `size`, `gap` must be constant time 
intervals. The `mode` can be either `proctime()` or `rowtime()`, which 
specifies the window is over the processing time or the event time.
+For SQL queries on streaming tables, the `time_attr` argument of the group 
window function must be one of the `rowtime()` or `proctime()` time-indicators, 
which distinguish between event or processing time, respectively. For SQL on 
batch tables, the `time_attr` argument of the group window function must be an 
attribute of type `TIMESTAMP`. 
 
-As an example, the following SQL computes the total number of records over a 
15 minute tumbling window over processing time:
+The following examples show how to specify SQL queries with group windows on 
streaming tables. 
 
-```
-SELECT COUNT(*) FROM $table GROUP BY TUMBLE(proctime(), INTERVAL '15' MINUTE)
-```
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
-#### Limitations
+// ingest a DataStream from an external source
+DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "user, product, amount");
+
+// compute SUM(amount) per day (in event-time)
+Table result1 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL 
'1' DAY), user");
+
+// compute SUM(amount) per day (in processing-time)
+Table result2 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL 
'1' DAY), user");
+
+// compute every hour the SUM(amount) of the last 24 hours in event-time
+Table result3 = tableEnv.sql(
+  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL 
'1' HOUR, INTERVAL '1' DAY), product");
 
-The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, 
and `UNION` clauses. Aggregations or joins are not fully supported yet.
+// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
+Table result4 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL 
'12' HOUR), user");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(Long, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
+
+// compute SUM(amount) per day (in event-time)
+val result1 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL 
'1' DAY), user")
+
+// compute SUM(amount) per day (in processing-time)
+val result2 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL 
'1' DAY), user")
+
+// compute every hour the SUM(amount) of the last 24 hours in event-time
+val result3 = tableEnv.sql(
+  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL 
'1' HOUR, INTERVAL '1' DAY), product")
+
+// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
+val result4 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL 
'12' HOUR), user")
+
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 5caaf1f..222021a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -35,7 +35,10 @@ object FlinkRuleSets {
     ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
     ReduceExpressionsRule.JOIN_INSTANCE,
-    ProjectToWindowRule.PROJECT
+    ProjectToWindowRule.PROJECT,
+
+    // Transform window to LogicalWindowAggregate
+    DataSetLogicalWindowAggregateRule.INSTANCE
   )
 
   /**
@@ -132,7 +135,7 @@ object FlinkRuleSets {
     */
   val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
     // Transform window to LogicalWindowAggregate
-    LogicalWindowAggregateRule.INSTANCE,
+    DataStreamLogicalWindowAggregateRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..34433f9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.common
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import _root_.scala.collection.JavaConversions._
+
+abstract class LogicalWindowAggregateRule(ruleName: String)
+  extends RelOptRule(
+    RelOptRule.operand(classOf[LogicalAggregate],
+      RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())),
+    ruleName) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != 
agg.getGroupSet
+
+    val windowExpressions = getWindowExpressions(agg)
+    if (windowExpressions.length > 1) {
+      throw new TableException("Only a single window group function may be 
used in GROUP BY")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator && windowExpressions.nonEmpty
+  }
+
+  /**
+    * Transform LogicalAggregate with windowing expression to LogicalProject
+    * + LogicalWindowAggregate + LogicalProject.
+    *
+    * The transformation adds an additional LogicalProject at the top to ensure
+    * that the types are equivalent.
+    */
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val agg = call.rel[LogicalAggregate](0)
+    val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+
+    val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head
+    val window = translateWindowExpression(windowExpr, 
project.getInput.getRowType)
+
+    val builder = call.builder()
+    val rexBuilder = builder.getRexBuilder
+
+    val inAggGroupExpression = getInAggregateGroupExpression(rexBuilder, 
windowExpr)
+    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+    val newAgg = builder
+      .push(project.getInput)
+      .project(project.getChildExps.updated(windowExprIdx, 
inAggGroupExpression))
+      .aggregate(builder.groupKey(
+        newGroupSet,
+        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+      .build().asInstanceOf[LogicalAggregate]
+
+    // Create an additional project to conform with types
+    val outAggGroupExpression = getOutAggregateGroupExpression(rexBuilder, 
windowExpr)
+    val transformed = call.builder()
+    transformed.push(LogicalWindowAggregate.create(
+      window.toLogicalWindow,
+      Seq[NamedWindowProperty](),
+      newAgg))
+      .project(transformed.fields().patch(windowExprIdx, 
Seq(outAggGroupExpression), 0))
+
+    call.transformTo(transformed.build())
+  }
+
+  private[table] def getWindowExpressions(agg: LogicalAggregate): 
Seq[(RexCall, Int)] = {
+
+    val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+    val groupKeys = agg.getGroupSet
+
+    // get grouping expressions
+    val groupExpr = project.getProjects.zipWithIndex.filter(p => 
groupKeys.get(p._2))
+
+    // filter grouping expressions for window expressions
+    groupExpr.filter { g =>
+      g._1 match {
+        case call: RexCall =>
+          call.getOperator match {
+            case SqlStdOperatorTable.TUMBLE =>
+              if (call.getOperands.size() == 2) {
+                true
+              } else {
+                throw TableException("TUMBLE window with alignment is not 
supported yet.")
+              }
+            case SqlStdOperatorTable.HOP =>
+              if (call.getOperands.size() == 3) {
+                true
+              } else {
+                throw TableException("HOP window with alignment is not 
supported yet.")
+              }
+            case SqlStdOperatorTable.SESSION =>
+              if (call.getOperands.size() == 2) {
+                true
+              } else {
+                throw TableException("SESSION window with alignment is not 
supported yet.")
+              }
+            case _ => false
+          }
+        case _ => false
+      }
+    }.map(w => (w._1.asInstanceOf[RexCall], w._2))
+  }
+
+  /** Returns the expression that replaces the window expression before the 
aggregation. */
+  private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode
+
+  /** Returns the expression that replaces the window expression after the 
aggregation. */
+  private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode
+
+  /** translate the group window expression in to a Flink Table window. */
+  private[table] def translateWindowExpression(windowExpr: RexCall, rowType: 
RelDataType): Window
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..883f5ae
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import java.math.BigDecimal
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.api.{TableException, Window}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+class DataSetLogicalWindowAggregateRule
+  extends LogicalWindowAggregateRule("DataSetLogicalWindowAggregateRule") {
+
+  /** Returns the operand of the group window function. */
+  override private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = windowExpression.getOperands.get(0)
+
+  /** Returns a zero literal of the correct type. */
+  override private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = {
+
+    val literalType = windowExpression.getOperands.get(0).getType
+    rexBuilder.makeZeroLiteral(literalType)
+  }
+
+  override private[table] def translateWindowExpression(
+      windowExpr: RexCall,
+      rowType: RelDataType): Window = {
+
+    def getOperandAsLong(call: RexCall, idx: Int): Long =
+      call.getOperands.get(idx) match {
+        case v: RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
+        case _ => throw new TableException("Only constant window descriptors 
are supported")
+      }
+
+    def getFieldReference(operand: RexNode): Expression = {
+      operand match {
+        case ref: RexInputRef =>
+          // resolve field name of window attribute
+          val fieldName = rowType.getFieldList.get(ref.getIndex).getName
+          val fieldType = rowType.getFieldList.get(ref.getIndex).getType
+          ResolvedFieldReference(fieldName, 
FlinkTypeFactory.toTypeInfo(fieldType))
+      }
+    }
+
+    windowExpr.getOperator match {
+      case SqlStdOperatorTable.TUMBLE =>
+        val interval = getOperandAsLong(windowExpr, 1)
+        val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+
+      case SqlStdOperatorTable.HOP =>
+        val (slide, size) = (getOperandAsLong(windowExpr, 1), 
getOperandAsLong(windowExpr, 2))
+        val w = Slide
+          .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+          .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+
+      case SqlStdOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowExpr, 1)
+        val w = Session.withGap(Literal(gap, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+    }
+  }
+}
+
+object DataSetLogicalWindowAggregateRule {
+  val INSTANCE = new DataSetLogicalWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..175a202
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import java.math.BigDecimal
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.api.{TableException, Window}
+import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.functions.TimeModeTypes
+import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+class DataStreamLogicalWindowAggregateRule
+  extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
+
+  /** Returns a zero literal of the correct time type */
+  override private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
+
+  /** Returns a zero literal of the correct time type */
+  override private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
+
+  private def createZeroLiteral(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = {
+
+    val timeType = windowExpression.operands.get(0).getType
+    timeType match {
+      case TimeModeTypes.ROWTIME =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.ROWTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
+      case TimeModeTypes.PROCTIME =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.PROCTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+      case _ =>
+        throw TableException(s"""Unexpected time type $timeType encountered""")
+    }
+  }
+
+  override private[table] def translateWindowExpression(
+      windowExpr: RexCall,
+      rowType: RelDataType): Window = {
+
+    def getOperandAsLong(call: RexCall, idx: Int): Long =
+      call.getOperands.get(idx) match {
+        case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
+        case _ => throw new TableException("Only constant window descriptors 
are supported")
+      }
+
+    windowExpr.getOperator match {
+      case SqlStdOperatorTable.TUMBLE =>
+        val interval = getOperandAsLong(windowExpr, 1)
+        val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+
+      case SqlStdOperatorTable.HOP =>
+        val (slide, size) = (getOperandAsLong(windowExpr, 1), 
getOperandAsLong(windowExpr, 2))
+        val w = Slide
+          .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+          .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+      case SqlStdOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowExpr, 1)
+        val w = Session.withGap(Literal(gap, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+    }
+  }
+}
+
+object DataStreamLogicalWindowAggregateRule {
+  val INSTANCE = new DataStreamLogicalWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
deleted file mode 100644
index 7572e46..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.rules.datastream
-
-import java.math.BigDecimal
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.plan._
-import org.apache.calcite.plan.hep.HepRelVertex
-import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
-import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
-import org.apache.calcite.sql.fun.{SqlFloorFunction, SqlStdOperatorTable}
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.TimeModeTypes
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-import _root_.scala.collection.JavaConversions._
-
-class LogicalWindowAggregateRule
-  extends RelOptRule(
-    LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
-    "LogicalWindowAggregateRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
-
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != 
agg.getGroupSet
-
-    val windowClause = recognizeWindow(agg)
-    !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
-  }
-
-  /**
-    * Transform LogicalAggregate with windowing expression to LogicalProject
-    * + LogicalWindowAggregate + LogicalProject.
-    *
-    * The transformation adds an additional LogicalProject at the top to ensure
-    * that the types are equivalent.
-    */
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val agg = call.rel[LogicalAggregate](0)
-    val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
-    val (windowExprIdx, window) = recognizeWindow(agg).get
-    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
-
-    val builder = call.builder()
-    val rexBuilder = builder.getRexBuilder
-
-    // build dummy literal with type depending on time semantics
-    val zero = window match {
-      case _: EventTimeWindow =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.ROWTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
-      case _ =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.PROCTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
-    }
-
-    val newAgg = builder
-      .push(project.getInput)
-      .project(project.getChildExps.updated(windowExprIdx, zero))
-      .aggregate(builder.groupKey(
-        newGroupSet,
-        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
-      .build().asInstanceOf[LogicalAggregate]
-
-    // Create an additional project to conform with types
-    val transformed = call.builder()
-    transformed.push(LogicalWindowAggregate.create(
-      window.toLogicalWindow,
-      Seq[NamedWindowProperty](),
-      newAgg))
-      .project(transformed.fields().patch(windowExprIdx, Seq(zero), 0))
-    call.transformTo(transformed.build())
-  }
-
-  private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = 
{
-    val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
-    val groupKeys = agg.getGroupSet
-
-    // filter expressions on which is grouped
-    val groupExpr = project.getProjects.zipWithIndex.filter(p => 
groupKeys.get(p._2))
-
-    // check for window expressions in group expressions
-    val windowExpr = groupExpr
-      .map(g => (g._2, identifyWindow(g._1)) )
-      .filter(_._2.isDefined)
-      .map(g => (g._1, g._2.get.as("w$")) )
-
-    windowExpr.size match {
-      case 0 => None
-      case 1 => Some(windowExpr.head)
-      case _ => throw new TableException("Multiple windows are not supported")
-    }
-  }
-
-  private def identifyWindow(field: RexNode): Option[Window] = {
-    field match {
-      case call: RexCall =>
-        call.getOperator match {
-          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.TUMBLE => 
TumbleWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.HOP => 
SlidingWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.SESSION => 
SessionWindowTranslator(call).toWindow
-          case _ => None
-        }
-      case _ => None
-    }
-  }
-}
-
-private abstract class WindowTranslator {
-  val call: RexCall
-
-  protected def unwrapLiteral[T](node: RexNode): T =
-    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
-
-  protected def getOperandAsLong(idx: Int): Long =
-    call.getOperands.get(idx) match {
-      case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
-      case _ => throw new TableException("Only constant window descriptors are 
supported")
-    }
-
-  def toWindow: Option[Window]
-}
-
-private case class FloorWindowTranslator(call: RexCall) extends 
WindowTranslator {
-  override def toWindow: Option[Window] = {
-    val range = unwrapLiteral[TimeUnitRange](call.getOperands.get(1))
-    val w = Tumble.over(Literal(range.startUnit.multiplier.longValue(),
-      TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class TumbleWindowTranslator(call: RexCall) extends 
WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 2) {
-      throw new TableException("TUMBLE with alignment is not supported yet.")
-    }
-
-    val interval = getOperandAsLong(1)
-    val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class SlidingWindowTranslator(call: RexCall) extends 
WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 3) {
-      throw new TableException("HOP with alignment is not supported yet.")
-    }
-
-    val (slide, size) = (getOperandAsLong(1), getOperandAsLong(2))
-    val w = Slide
-      .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-      .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class SessionWindowTranslator(call: RexCall) extends 
WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 2) {
-      throw new TableException("SESSION with alignment is not supported yet")
-    }
-
-    val gap = getOperandAsLong(1)
-    val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-object LogicalWindowAggregateRule {
-  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
-    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
-
-  private[flink] val INSTANCE = new LogicalWindowAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
index cceb272..600c15b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -18,13 +18,15 @@
 
 package org.apache.flink.table.api.scala.batch.sql
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.scala._
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -292,4 +294,92 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results2.asJava, expected2)
     TestBaseUtils.compareResultAsText(results3.asJava, expected3)
   }
+
+  @Test
+  def testTumbleWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1",
+      "2,2,1", "2,3,1",
+      "3,9,2", "3,6,1",
+      "4,15,2", "4,19,2",
+      "5,11,1", "5,39,3", "5,15,1",
+      "6,33,2", "6,57,3", "6,21,1"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testHopWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY b, HOP(ts, INTERVAL '2' SECOND, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1","1,1,1",
+      "2,5,2","2,5,2",
+      "3,9,2", "3,15,3", "3,6,1",
+      "4,7,1", "4,24,3", "4,27,3", "4,10,1",
+      "5,11,1", "5,36,3", "5,54,4", "5,29,2",
+      "6,33,2", "6,70,4", "6,78,4", "6,41,2"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testSessionWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT MIN(a), MAX(a), SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .filter(x => (x._2 % 2) == 0)
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "2,10,39,6",
+      "16,21,111,6"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
new file mode 100644
index 0000000..e84ede6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class WindowAggregateTest extends TableTestBase {
+
+  @Test
+  def testNonPartitionedTumbleWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB FROM T GROUP BY TUMBLE(ts, 
INTERVAL '2' HOUR)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a, b")
+          ),
+          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
7200000.millis)),
+          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+        ),
+        term("select", "sumA, cntB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedTumbleWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT c, SUM(a) AS sumA, MIN(b) AS minB FROM T GROUP BY TUMBLE(ts, 
INTERVAL '4' MINUTE), c"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c"),
+          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
240000.millis)),
+          term("select", "c, SUM(a) AS sumA, MIN(b) AS minB")
+        ),
+        term("select", "c, sumA, minB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNonPartitionedHopWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '90' MINUTE)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a, b")
+          ),
+          term("window",
+            EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 
900000.millis)),
+          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+        ),
+        term("select", "sumA, cntB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedHopWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 
'ts)
+
+    val sqlQuery =
+      "SELECT c, SUM(a) AS sumA, AVG(b) AS avgB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c, d"),
+          term("window",
+            EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 
3600000.millis)),
+          term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB")
+        ),
+        term("select", "c, sumA, avgB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNonPartitionedSessionWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT COUNT(*) AS cnt FROM T GROUP BY SESSION(ts, INTERVAL '30' 
MINUTE)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts")
+          ),
+          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
1800000.millis)),
+          term("select", "COUNT(*) AS cnt")
+        ),
+        term("select", "cnt")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedSessionWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 
'ts)
+
+    val sqlQuery =
+      "SELECT c, d, SUM(a) AS sumA, MIN(b) AS minB " +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c, d"),
+          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
43200000.millis)),
+          term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB")
+        ),
+        term("select", "c, d, sumA, minB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testTumbleWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY TUMBLE(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testHopWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME 
'10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testSessionWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testVariableWindowSize() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql = "SELECT COUNT(*) " +
+      "FROM T " +
+      "GROUP BY TUMBLE(proctime(), b * INTERVAL '1' MINUTE)"
+    util.verifySql(sql, "n/a")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 1c1752f..f4befa6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -85,92 +85,6 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
-  def testNonPartitionedTumbleWindow() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
-          ),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 
3600000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
-        ),
-        term("select", "EXPR$0")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testPartitionedTumbleWindow1() = {
-    val sql = "SELECT a, COUNT(*) FROM MyTable GROUP BY a, FLOOR(rowtime() TO 
MINUTE)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1")
-          ),
-          term("groupBy", "a"),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 
60000.millis)),
-          term("select", "a", "COUNT(*) AS EXPR$1")
-        ),
-        term("select", "a", "EXPR$1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testPartitionedTumbleWindow2() = {
-    val sql = "SELECT a, SUM(c), b FROM MyTable GROUP BY a, FLOOR(rowtime() TO 
SECOND), b"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
-          ),
-          term("groupBy", "a, b"),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 
1000.millis)),
-          term("select", "a", "b", "SUM(c) AS EXPR$1")
-        ),
-        term("select", "a", "EXPR$1", "b")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcessingTime() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(proctime() TO HOUR)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
-          ),
-          term("window", ProcessingTimeTumblingGroupWindow(Some('w$), 
3600000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
-        ),
-        term("select", "EXPR$0")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
   def testTumbleFunction() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), 
INTERVAL '15' MINUTE)"
     val expected =

Reply via email to