[FLINK-6011] [table] Support TUMBLE, HOP, SESSION group window functions in SQL 
queries on streams.

This closes #3665.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1649f352
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1649f352
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1649f352

Branch: refs/heads/table-retraction
Commit: 1649f35200705111a155962af33829f24acefd11
Parents: a6add62
Author: Haohui Mai <whe...@apache.org>
Authored: Mon Apr 3 14:32:09 2017 -0700
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Apr 5 12:27:28 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  41 ++++++-
 .../datastream/LogicalWindowAggregateRule.scala | 118 ++++++++++++++-----
 .../flink/table/validate/FunctionCatalog.scala  |   5 +-
 .../scala/stream/sql/WindowAggregateTest.scala  | 108 ++++++++++++++++-
 4 files changed, 237 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 7c37aea..7156393 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1418,9 +1418,45 @@ val result2 = tableEnv.sql(
 </div>
 </div>
 
+#### Group windows
+
+Streaming SQL supports aggregation on group windows by specifying the windows 
in the `GROUP BY` clause. The following table describes the syntax of the group 
windows:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th><code>GROUP BY</code> clause</th>
+      <th class="text-left">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><code>TUMBLE(mode, interval)</code></td>
+      <td>A tumbling window over the time period specified by 
<code>interval</code>.</td>
+    </tr>
+    <tr>
+      <td><code>HOP(mode, slide, size)</code></td>
+      <td>A sliding window with the length of <code>size</code> and moves 
every <code>slide</code>.</td>
+    </tr>
+    <tr>
+      <td><code>SESSION(mode, gap)</code></td>
+      <td>A session window that has <code>gap</code> as the gap between two 
windows.</td>
+    </tr>
+  </tbody>
+</table>
+
+The parameters `interval`, `slide`, `size`, `gap` must be constant time 
intervals. The `mode` can be either `proctime()` or `rowtime()`, which 
specifies the window is over the processing time or the event time.
+
+As an example, the following SQL computes the total number of records over a 
15 minute tumbling window over processing time:
+
+```
+SELECT COUNT(*) FROM $table GROUP BY TUMBLE(proctime(), INTERVAL '15' MINUTE)
+```
+
 #### Limitations
 
-The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, 
and `UNION` clauses. Aggregations or joins are not supported yet.
+The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, 
and `UNION` clauses. Aggregations or joins are not fully supported yet.
 
 {% top %}
 
@@ -5093,8 +5129,7 @@ The following operations are not supported yet:
 - Collection functions
 - Aggregate functions like STDDEV_xxx, VAR_xxx, and REGR_xxx
 - Distinct aggregate functions like COUNT DISTINCT
-- Window functions
-- Grouping functions
+- Row windows
 
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
index 9883957..7572e46 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -17,23 +17,25 @@
  */
 package org.apache.flink.table.plan.rules.datastream
 
+import java.math.BigDecimal
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.plan._
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
-import org.apache.calcite.sql.fun.SqlFloorFunction
+import org.apache.calcite.sql.fun.{SqlFloorFunction, SqlStdOperatorTable}
 import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.api.scala.Tumble
-import org.apache.flink.table.api.{EventTimeWindow, TableException, 
TumblingWindow, Window}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.TimeModeTypes
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
-import scala.collection.JavaConversions._
+import _root_.scala.collection.JavaConversions._
 
 class LogicalWindowAggregateRule
   extends RelOptRule(
@@ -117,46 +119,104 @@ class LogicalWindowAggregateRule
   }
 
   private def identifyWindow(field: RexNode): Option[Window] = {
-    // Detects window expressions by pattern matching
-    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
-    //   with time being equal to proctime() or rowtime()
     field match {
       case call: RexCall =>
         call.getOperator match {
-          case _: SqlFloorFunction =>
-            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
-            val unit: TimeUnitRange = 
operand.getValue.asInstanceOf[TimeUnitRange]
-            val w = 
LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
-            call.getType match {
-              case TimeModeTypes.PROCTIME =>
-                return Some(w)
-              case TimeModeTypes.ROWTIME =>
-                return Some(w.on("rowtime"))
-              case _ =>
-            }
-          case _ =>
+          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
+          case SqlStdOperatorTable.TUMBLE => 
TumbleWindowTranslator(call).toWindow
+          case SqlStdOperatorTable.HOP => 
SlidingWindowTranslator(call).toWindow
+          case SqlStdOperatorTable.SESSION => 
SessionWindowTranslator(call).toWindow
+          case _ => None
         }
-      case _ =>
+      case _ => None
     }
-    None
   }
+}
+
+private abstract class WindowTranslator {
+  val call: RexCall
+
+  protected def unwrapLiteral[T](node: RexNode): T =
+    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
 
+  protected def getOperandAsLong(idx: Int): Long =
+    call.getOperands.get(idx) match {
+      case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
+      case _ => throw new TableException("Only constant window descriptors are 
supported")
+    }
+
+  def toWindow: Option[Window]
 }
 
-object LogicalWindowAggregateRule {
+private case class FloorWindowTranslator(call: RexCall) extends 
WindowTranslator {
+  override def toWindow: Option[Window] = {
+    val range = unwrapLiteral[TimeUnitRange](call.getOperands.get(1))
+    val w = Tumble.over(Literal(range.startUnit.multiplier.longValue(),
+      TimeIntervalTypeInfo.INTERVAL_MILLIS))
+    call.getType match {
+      case TimeModeTypes.PROCTIME => Some(w)
+      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
+      case _ => None
+    }
+  }
+}
 
-  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
-    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
+private case class TumbleWindowTranslator(call: RexCall) extends 
WindowTranslator {
+  override def toWindow: Option[Window] = {
 
-  private[flink] val INSTANCE = new LogicalWindowAggregateRule
+    if (call.getOperands.size() != 2) {
+      throw new TableException("TUMBLE with alignment is not supported yet.")
+    }
 
-  private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): 
TumblingWindow = {
-    intervalToTumbleWindow(range.startUnit.multiplier.longValue())
+    val interval = getOperandAsLong(1)
+    val w = Tumble.over(Literal(interval, 
TimeIntervalTypeInfo.INTERVAL_MILLIS))
+    call.getType match {
+      case TimeModeTypes.PROCTIME => Some(w)
+      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
+      case _ => None
+    }
   }
+}
 
-  private def intervalToTumbleWindow(size: Long): TumblingWindow = {
-    Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+private case class SlidingWindowTranslator(call: RexCall) extends 
WindowTranslator {
+  override def toWindow: Option[Window] = {
+
+    if (call.getOperands.size() != 3) {
+      throw new TableException("HOP with alignment is not supported yet.")
+    }
+
+    val (slide, size) = (getOperandAsLong(1), getOperandAsLong(2))
+    val w = Slide
+      .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+      .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+    call.getType match {
+      case TimeModeTypes.PROCTIME => Some(w)
+      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
+      case _ => None
+    }
   }
+}
+
+private case class SessionWindowTranslator(call: RexCall) extends 
WindowTranslator {
+  override def toWindow: Option[Window] = {
+
+    if (call.getOperands.size() != 2) {
+      throw new TableException("SESSION with alignment is not supported yet")
+    }
 
+    val gap = getOperandAsLong(1)
+    val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+    call.getType match {
+      case TimeModeTypes.PROCTIME => Some(w)
+      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
+      case _ => None
+    }
+  }
 }
 
+object LogicalWindowAggregateRule {
+  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
+    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
+
+  private[flink] val INSTANCE = new LogicalWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 224a370..6b8fd95 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -350,7 +350,10 @@ class BasicOperatorTable extends 
ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.EXISTS,
     // EXTENSIONS
     EventTimeExtractor,
-    ProcTimeExtractor
+    ProcTimeExtractor,
+    SqlStdOperatorTable.TUMBLE,
+    SqlStdOperatorTable.HOP,
+    SqlStdOperatorTable.SESSION
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 4e0d4fd..1c1752f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -17,12 +17,14 @@
  */
 package org.apache.flink.table.api.scala.stream.sql
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, 
ProcessingTimeTumblingGroupWindow}
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, 
ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, 
ProcessingTimeTumblingGroupWindow}
 import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
 
 class WindowAggregateTest extends TableTestBase {
@@ -168,6 +170,108 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testTumbleFunction() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), 
INTERVAL '15' MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "1970-01-01 00:00:00 AS $f0")
+          ),
+          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 
900000.millis)),
+          term("select", "COUNT(*) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testHoppingFunction() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
+      "HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "1970-01-01 00:00:00 AS $f0")
+          ),
+          term("window", ProcessingTimeSlidingGroupWindow(Some('w$),
+            3600000.millis, 900000.millis)),
+          term("select", "COUNT(*) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testSessionFunction() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
+      "SESSION(proctime(), INTERVAL '15' MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "1970-01-01 00:00:00 AS $f0")
+          ),
+          term("window", ProcessingTimeSessionGroupWindow(Some('w$), 
900000.millis)),
+          term("select", "COUNT(*) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testTumbleWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testHopWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY HOP(proctime(), INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME 
'10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testSessionWindowNoOffset(): Unit = {
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM MyTable " +
+        "GROUP BY SESSION(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    streamUtil.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testVariableWindowSize() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime(), c * 
INTERVAL '1' MINUTE)"
+    streamUtil.verifySql(sql, "n/a")
+  }
+
   @Test(expected = classOf[TableException])
   def testMultiWindow() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +

Reply via email to