[FLINK-6011] [table] Support TUMBLE, HOP, SESSION group window functions in SQL queries on streams.
This closes #3665. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1649f352 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1649f352 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1649f352 Branch: refs/heads/master Commit: 1649f35200705111a155962af33829f24acefd11 Parents: a6add62 Author: Haohui Mai <whe...@apache.org> Authored: Mon Apr 3 14:32:09 2017 -0700 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Apr 5 12:27:28 2017 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 41 ++++++- .../datastream/LogicalWindowAggregateRule.scala | 118 ++++++++++++++----- .../flink/table/validate/FunctionCatalog.scala | 5 +- .../scala/stream/sql/WindowAggregateTest.scala | 108 ++++++++++++++++- 4 files changed, 237 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 7c37aea..7156393 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1418,9 +1418,45 @@ val result2 = tableEnv.sql( </div> </div> +#### Group windows + +Streaming SQL supports aggregation on group windows by specifying the windows in the `GROUP BY` clause. The following table describes the syntax of the group windows: + +<table class="table table-bordered"> + <thead> + <tr> + <th><code>GROUP BY</code> clause</th> + <th class="text-left">Description</th> + </tr> + </thead> + + <tbody> + <tr> + <td><code>TUMBLE(mode, interval)</code></td> + <td>A tumbling window over the time period specified by <code>interval</code>.</td> + </tr> + <tr> + <td><code>HOP(mode, slide, size)</code></td> + <td>A sliding window with the length of <code>size</code> and moves every <code>slide</code>.</td> + </tr> + <tr> + <td><code>SESSION(mode, gap)</code></td> + <td>A session window that has <code>gap</code> as the gap between two windows.</td> + </tr> + </tbody> +</table> + +The parameters `interval`, `slide`, `size`, `gap` must be constant time intervals. The `mode` can be either `proctime()` or `rowtime()`, which specifies the window is over the processing time or the event time. + +As an example, the following SQL computes the total number of records over a 15 minute tumbling window over processing time: + +``` +SELECT COUNT(*) FROM $table GROUP BY TUMBLE(proctime(), INTERVAL '15' MINUTE) +``` + #### Limitations -The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not supported yet. +The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not fully supported yet. {% top %} @@ -5093,8 +5129,7 @@ The following operations are not supported yet: - Collection functions - Aggregate functions like STDDEV_xxx, VAR_xxx, and REGR_xxx - Distinct aggregate functions like COUNT DISTINCT -- Window functions -- Grouping functions +- Row windows http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala index 9883957..7572e46 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala @@ -17,23 +17,25 @@ */ package org.apache.flink.table.plan.rules.datastream +import java.math.BigDecimal + import com.google.common.collect.ImmutableList import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.calcite.plan._ import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} -import org.apache.calcite.sql.fun.SqlFloorFunction +import org.apache.calcite.sql.fun.{SqlFloorFunction, SqlStdOperatorTable} import org.apache.calcite.util.ImmutableBitSet -import org.apache.flink.table.api.scala.Tumble -import org.apache.flink.table.api.{EventTimeWindow, TableException, TumblingWindow, Window} +import org.apache.flink.table.api._ +import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.TimeModeTypes import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import org.apache.flink.table.typeutils.TimeIntervalTypeInfo -import scala.collection.JavaConversions._ +import _root_.scala.collection.JavaConversions._ class LogicalWindowAggregateRule extends RelOptRule( @@ -117,46 +119,104 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { - // Detects window expressions by pattern matching - // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), - // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => - val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] - val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] - val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) - call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } - None } +} + +private abstract class WindowTranslator { + val call: RexCall + + protected def unwrapLiteral[T](node: RexNode): T = + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] + protected def getOperandAsLong(idx: Int): Long = + call.getOperands.get(idx) match { + case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue() + case _ => throw new TableException("Only constant window descriptors are supported") + } + + def toWindow: Option[Window] } -object LogicalWindowAggregateRule { +private case class FloorWindowTranslator(call: RexCall) extends WindowTranslator { + override def toWindow: Option[Window] = { + val range = unwrapLiteral[TimeUnitRange](call.getOperands.get(1)) + val w = Tumble.over(Literal(range.startUnit.multiplier.longValue(), + TimeIntervalTypeInfo.INTERVAL_MILLIS)) + call.getType match { + case TimeModeTypes.PROCTIME => Some(w) + case TimeModeTypes.ROWTIME => Some(w.on("rowtime")) + case _ => None + } + } +} - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], - RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) +private case class TumbleWindowTranslator(call: RexCall) extends WindowTranslator { + override def toWindow: Option[Window] = { - private[flink] val INSTANCE = new LogicalWindowAggregateRule + if (call.getOperands.size() != 2) { + throw new TableException("TUMBLE with alignment is not supported yet.") + } - private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = { - intervalToTumbleWindow(range.startUnit.multiplier.longValue()) + val interval = getOperandAsLong(1) + val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + call.getType match { + case TimeModeTypes.PROCTIME => Some(w) + case TimeModeTypes.ROWTIME => Some(w.on("rowtime")) + case _ => None + } } +} - private def intervalToTumbleWindow(size: Long): TumblingWindow = { - Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS) +private case class SlidingWindowTranslator(call: RexCall) extends WindowTranslator { + override def toWindow: Option[Window] = { + + if (call.getOperands.size() != 3) { + throw new TableException("HOP with alignment is not supported yet.") + } + + val (slide, size) = (getOperandAsLong(1), getOperandAsLong(2)) + val w = Slide + .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + call.getType match { + case TimeModeTypes.PROCTIME => Some(w) + case TimeModeTypes.ROWTIME => Some(w.on("rowtime")) + case _ => None + } } +} + +private case class SessionWindowTranslator(call: RexCall) extends WindowTranslator { + override def toWindow: Option[Window] = { + + if (call.getOperands.size() != 2) { + throw new TableException("SESSION with alignment is not supported yet") + } + val gap = getOperandAsLong(1) + val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) + call.getType match { + case TimeModeTypes.PROCTIME => Some(w) + case TimeModeTypes.ROWTIME => Some(w.on("rowtime")) + case _ => None + } + } } +object LogicalWindowAggregateRule { + private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], + RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + + private[flink] val INSTANCE = new LogicalWindowAggregateRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 224a370..6b8fd95 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -350,7 +350,10 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.EXISTS, // EXTENSIONS EventTimeExtractor, - ProcTimeExtractor + ProcTimeExtractor, + SqlStdOperatorTable.TUMBLE, + SqlStdOperatorTable.HOP, + SqlStdOperatorTable.SESSION ) builtInSqlOperators.foreach(register) http://git-wip-us.apache.org/repos/asf/flink/blob/1649f352/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 4e0d4fd..1c1752f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -17,12 +17,14 @@ */ package org.apache.flink.table.api.scala.stream.sql +import java.sql.Timestamp + import org.apache.flink.api.scala._ import org.apache.flink.table.api.TableException import org.apache.flink.table.api.scala._ -import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeTumblingGroupWindow} -import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, ProcessingTimeTumblingGroupWindow} import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test class WindowAggregateTest extends TableTestBase { @@ -168,6 +170,108 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testTumbleFunction() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "1970-01-01 00:00:00 AS $f0") + ), + term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 900000.millis)), + term("select", "COUNT(*) AS EXPR$0") + ), + term("select", "EXPR$0") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testHoppingFunction() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " + + "HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "1970-01-01 00:00:00 AS $f0") + ), + term("window", ProcessingTimeSlidingGroupWindow(Some('w$), + 3600000.millis, 900000.millis)), + term("select", "COUNT(*) AS EXPR$0") + ), + term("select", "EXPR$0") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testSessionFunction() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " + + "SESSION(proctime(), INTERVAL '15' MINUTE)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "1970-01-01 00:00:00 AS $f0") + ), + term("window", ProcessingTimeSessionGroupWindow(Some('w$), 900000.millis)), + term("select", "COUNT(*) AS EXPR$0") + ), + term("select", "EXPR$0") + ) + streamUtil.verifySql(sql, expected) + } + + @Test(expected = classOf[TableException]) + def testTumbleWindowNoOffset(): Unit = { + val sqlQuery = + "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + + "FROM MyTable " + + "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')" + + streamUtil.verifySql(sqlQuery, "n/a") + } + + @Test(expected = classOf[TableException]) + def testHopWindowNoOffset(): Unit = { + val sqlQuery = + "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + + "FROM MyTable " + + "GROUP BY HOP(proctime(), INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')" + + streamUtil.verifySql(sqlQuery, "n/a") + } + + @Test(expected = classOf[TableException]) + def testSessionWindowNoOffset(): Unit = { + val sqlQuery = + "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + + "FROM MyTable " + + "GROUP BY SESSION(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')" + + streamUtil.verifySql(sqlQuery, "n/a") + } + + @Test(expected = classOf[TableException]) + def testVariableWindowSize() = { + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime(), c * INTERVAL '1' MINUTE)" + streamUtil.verifySql(sql, "n/a") + } + @Test(expected = classOf[TableException]) def testMultiWindow() = { val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +