[FLINK-6697] [table] Add support for group window ROWTIME to batch SQL & Table API.
- Fixes [FLINK-7542] "AggregateITCase fails in different timezone." This closes #4796. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/babee277 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/babee277 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/babee277 Branch: refs/heads/master Commit: babee277204b6b1edcfe9c7c76348254019b2dd3 Parents: 14e0948 Author: Fabian Hueske <[email protected]> Authored: Sun Aug 6 23:55:56 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Wed Oct 25 22:01:39 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/tableApi.md | 16 +-- .../calcite/sql/fun/SqlGroupFunction.java | 20 +++ .../table/expressions/fieldExpression.scala | 25 +++- .../rules/common/WindowPropertiesRule.scala | 70 ++++++---- .../DataSetLogicalWindowAggregateRule.scala | 7 +- .../DataStreamLogicalWindowAggregateRule.scala | 7 +- .../table/runtime/aggregate/AggregateUtil.scala | 12 +- ...SetSessionWindowAggReduceGroupFunction.scala | 8 +- ...SetSlideWindowAggReduceCombineFunction.scala | 4 + ...taSetSlideWindowAggReduceGroupFunction.scala | 14 +- ...mbleTimeWindowAggReduceCombineFunction.scala | 4 + ...TumbleTimeWindowAggReduceGroupFunction.scala | 10 +- ...rementalAggregateAllTimeWindowFunction.scala | 4 +- ...IncrementalAggregateTimeWindowFunction.scala | 4 +- .../aggregate/TimeWindowPropertyCollector.scala | 18 ++- .../flink/table/validate/FunctionCatalog.scala | 27 +++- .../table/api/batch/sql/GroupWindowTest.scala | 24 ++-- .../validation/GroupWindowValidationTest.scala | 17 +-- .../table/api/batch/table/GroupWindowTest.scala | 134 +++++++++++++++++++ .../runtime/batch/sql/AggregateITCase.scala | 123 +++++++++++++++-- .../runtime/batch/table/GroupWindowITCase.scala | 97 ++++++++------ .../table/runtime/stream/sql/SqlITCase.scala | 4 +- .../stream/table/GroupWindowITCase.scala | 26 ++-- 23 files changed, 511 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/docs/dev/table/tableApi.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f0c4605..2b45b5a 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -344,7 +344,7 @@ Table orders = tableEnv.scan("Orders"); Table result = orders .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window .groupBy("a, w") // group by key and window - .select("a, w.start, w.end, b.sum as d"); // access window properties and aggregate + .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate {% endhighlight %} </td> </tr> @@ -427,7 +427,7 @@ val orders: Table = tableEnv.scan("Orders") val result: Table = orders .window(Tumble over 5.minutes on 'rowtime as 'w) // define window .groupBy('a, 'w) // group by key and window - .select('a, w.start, 'w.end, 'b.sum as 'd) // access window properties and aggregate + .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate {% endhighlight %} </td> </tr> @@ -1178,7 +1178,7 @@ val table = input </div> </div> -Window properties such as the start and end timestamp of a time window can be added in the select statement as a property of the window alias as `w.start` and `w.end`, respectively. +Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and uppper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29:59.999` as rowtime timestamp, and `14:30:00.000` as end timestamp. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -1186,7 +1186,7 @@ Window properties such as the start and end timestamp of a time window can be ad Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w - .select("a, w.start, w.end, b.count"); // aggregate and add window start and end timestamps + .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} </div> @@ -1195,7 +1195,7 @@ Table table = input val table = input .window([w: Window] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w - .select('a, 'w.start, 'w.end, 'b.count) // aggregate and add window start and end timestamps + .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} </div> </div> @@ -1227,7 +1227,7 @@ Tumbling windows are defined by using the `Tumble` class as follows: </tr> <tr> <td><code>as</code></td> - <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td> + <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td> </tr> </tbody> </table> @@ -1289,7 +1289,7 @@ Sliding windows are defined by using the `Slide` class as follows: </tr> <tr> <td><code>as</code></td> - <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td> + <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td> </tr> </tbody> </table> @@ -1347,7 +1347,7 @@ A session window is defined by using the `Session` class as follows: </tr> <tr> <td><code>as</code></td> - <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start or end time in the <code>select()</code> clause.</td> + <td>Assigns an alias to the window. The alias is used to reference the window in the following <code>groupBy()</code> clause and optionally to select window properties such as window start, end, or rowtime timestamps in the <code>select()</code> clause.</td> </tr> </tbody> </table> http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java index fd5ddf9..0bb26da 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java @@ -22,6 +22,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.validate.SqlMonotonicity; import com.google.common.collect.ImmutableList; @@ -84,6 +85,25 @@ public class SqlGroupFunction extends SqlFunction { this(kind.name(), kind, groupFunction, operandTypeChecker); } + /** Creates a SqlGroupFunction. + * + * @param name Function name + * @param kind Kind + * @param groupFunction Group function, if this is an auxiliary; + * null, if this is a group function + * @param returnTypeInference Inference of the functions return type + * @param operandTypeChecker Operand type checker + */ + public SqlGroupFunction(String name, SqlKind kind, SqlGroupFunction groupFunction, + SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker) { + super(name, kind, returnTypeInference, null, operandTypeChecker, + SqlFunctionCategory.SYSTEM); + this.groupFunction = groupFunction; + if (groupFunction != null) { + assert groupFunction.groupFunction == null; + } + } + /** Creates an auxiliary function from this grouped window function. * * @param kind Kind; also determines function name http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala index dab9ce3..bad5889 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{Table, UnresolvedException, ValidationException} +import org.apache.flink.table.api._ import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType} +import org.apache.flink.table.calcite.FlinkTypeFactory._ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -163,9 +163,13 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { override private[flink] def validateInput(): ValidationResult = { child match { - case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) => + case WindowReference(_, Some(tpe)) if isProctimeIndicatorType(tpe) => ValidationFailure("A proctime window cannot provide a rowtime attribute.") case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) => + // rowtime window + ValidationSuccess + case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP => + // batch time window ValidationSuccess case WindowReference(_, _) => ValidationFailure("Reference to a rowtime or proctime window required.") @@ -175,8 +179,19 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { } } - override def resultType: TypeInformation[_] = - TimeIndicatorTypeInfo.ROWTIME_INDICATOR + override def resultType: TypeInformation[_] = { + child match { + case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) => + // rowtime window + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case WindowReference(_, Some(tpe)) if tpe == Types.LONG || tpe == Types.SQL_TIMESTAMP => + // batch time window + Types.SQL_TIMESTAMP + case _ => + throw TableException("WindowReference of RowtimeAttribute has invalid type. " + + "Please report this bug.") + } + } override def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala index c228528..74f7b1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.tools.RelBuilder -import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.{TableException, Types, ValidationException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical.LogicalWindow @@ -59,23 +59,24 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa agg: LogicalWindowAggregate): RelNode = { val w = agg.getWindow - - val isRowtime = ExpressionUtils.isRowtimeAttribute(w.timeAttribute) - val isProctime = ExpressionUtils.isProctimeAttribute(w.timeAttribute) + val windowType = getWindowType(w) val startEndProperties = Seq( NamedWindowProperty(propertyName(w, "start"), WindowStart(w.aliasAttribute)), NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute))) // allow rowtime/proctime for rowtime windows and proctime for proctime windows - val timeProperties = if (isRowtime) { - Seq( - NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), - NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) - } else if (isProctime) { - Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) - } else { - Seq() + val timeProperties = windowType match { + case 'streamRowtime => + Seq( + NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), + NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + case 'streamProctime => + Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + case 'batchRowtime => + Seq(NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute))) + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } val properties = startEndProperties ++ timeProperties @@ -103,6 +104,18 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa builder.build() } + private def getWindowType(window: LogicalWindow): Symbol = { + if (ExpressionUtils.isRowtimeAttribute(window.timeAttribute)) { + 'streamRowtime + } else if (ExpressionUtils.isProctimeAttribute(window.timeAttribute)) { + 'streamProctime + } else if (window.timeAttribute.resultType == Types.SQL_TIMESTAMP) { + 'batchRowtime + } else { + throw new TableException("Unknown window type encountered. Please report this bug.") + } + } + /** Generates a property name for a window. */ private def propertyName(window: LogicalWindow, name: String): String = { window.aliasAttribute.asInstanceOf[WindowReference].name + name @@ -115,9 +128,7 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa builder: RelBuilder): RexNode = { val rexBuilder = builder.getRexBuilder - - val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute) - val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute) + val windowType = getWindowType(window) node match { case c: RexCall if isWindowStart(c) => @@ -129,22 +140,25 @@ abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleNa rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "end")), false) case c: RexCall if isWindowRowtime(c) => - if (isProctime) { - throw ValidationException("A proctime window cannot provide a rowtime attribute.") - } else if (isRowtime) { - // replace expression by access to window rowtime - builder.field(propertyName(window, "rowtime")) - } else { - throw TableException("Accessing the rowtime attribute of a window is not yet " + - "supported in a batch environment.") + windowType match { + case 'streamRowtime | 'batchRowtime => + // replace expression by access to window rowtime + builder.field(propertyName(window, "rowtime")) + case 'streamProctime => + throw ValidationException("A proctime window cannot provide a rowtime attribute.") + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } case c: RexCall if isWindowProctime(c) => - if (isProctime || isRowtime) { - // replace expression by access to window proctime - builder.field(propertyName(window, "proctime")) - } else { - throw ValidationException("Proctime is not supported in a batch environment.") + windowType match { + case 'streamProctime | 'streamRowtime => + // replace expression by access to window proctime + builder.field(propertyName(window, "proctime")) + case 'batchRowtime => + throw ValidationException("PROCTIME window property is not supported in batch queries.") + case _ => + throw new TableException("Unknown window type encountered. Please report this bug.") } case c: RexCall => http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala index 129e0d3..346eefa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala @@ -67,23 +67,24 @@ class DataSetLogicalWindowAggregateRule } } + val timeField = getFieldReference(windowExpr.getOperands.get(0)) windowExpr.getOperator match { case BasicOperatorTable.TUMBLE => val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) case BasicOperatorTable.HOP => val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) val w = Slide .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) case BasicOperatorTable.SESSION => val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(getFieldReference(windowExpr.getOperands.get(0))).as(WindowReference("w$")) + w.on(timeField).as(WindowReference("w$", Some(timeField.resultType))) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index eaad885..254f36b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -23,7 +23,6 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, ValidationException, Window} import org.apache.flink.table.calcite.FlinkTypeFactory @@ -89,7 +88,7 @@ class DataStreamLogicalWindowAggregateRule val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.HOP => val time = getOperandAsTimeIndicator(windowExpr, 0) @@ -98,14 +97,14 @@ class DataStreamLogicalWindowAggregateRule .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) case BasicOperatorTable.SESSION => val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - w.on(time).as(WindowReference("w$")) + w.on(time).as(WindowReference("w$", Some(time.resultType))) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index ce13cdc..bdfdbf5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -595,7 +595,7 @@ object AggregateUtil { window match { case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => // tumbling time window - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for incremental aggregations new DataSetTumbleTimeWindowAggReduceCombineFunction( @@ -604,6 +604,7 @@ object AggregateUtil { asLong(size), startPos, endPos, + timePos, keysAndAggregatesArity) } else { @@ -613,6 +614,7 @@ object AggregateUtil { asLong(size), startPos, endPos, + timePos, outputType.getFieldCount) } case TumblingGroupWindow(_, _, size) => @@ -622,17 +624,18 @@ object AggregateUtil { asLong(size)) case SessionGroupWindow(_, _, gap) => - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) new DataSetSessionWindowAggReduceGroupFunction( genFinalAggFunction, keysAndAggregatesArity, startPos, endPos, + timePos, asLong(gap), isInputCombined) case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => - val (startPos, endPos, _) = computeWindowPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for partial aggregations new DataSetSlideWindowAggReduceCombineFunction( @@ -641,6 +644,7 @@ object AggregateUtil { keysAndAggregatesArity, startPos, endPos, + timePos, asLong(size)) } else { @@ -650,6 +654,7 @@ object AggregateUtil { keysAndAggregatesArity, startPos, endPos, + timePos, asLong(size)) } @@ -659,6 +664,7 @@ object AggregateUtil { keysAndAggregatesArity, None, None, + None, asLong(size)) case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index d99ca31..372fc0d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -44,6 +44,7 @@ import org.apache.flink.util.Collector * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos The relative window-start field position. * @param finalRowWindowEndPos The relative window-end field position. + * @param finalRowWindowRowtimePos The relative window-rowtime field position. * @param gap Session time window gap. */ class DataSetSessionWindowAggReduceGroupFunction( @@ -51,13 +52,14 @@ class DataSetSessionWindowAggReduceGroupFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], gap: Long, isInputCombined: Boolean) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ private val intermediateRowWindowStartPos = keysAndAggregatesArity private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1 @@ -78,10 +80,10 @@ class DataSetSessionWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector( + collector = new DataSetTimeWindowPropertyCollector( finalRowWindowStartPos, finalRowWindowEndPos, - None) + finalRowWindowRowtimePos) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala index 381d443..2da838f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala @@ -36,6 +36,8 @@ import org.apache.flink.types.Row * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the + * output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( @@ -44,12 +46,14 @@ class DataSetSlideWindowAggReduceCombineFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( genFinalAggregations, keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, + finalRowWindowRowtimePos, windowSize) with CombineFunction[Row, Row] { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala index c64a522..474a09b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -36,6 +36,8 @@ import org.apache.flink.util.Collector * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param finalRowWindowRowtimePos relative window-rowtime position to the last field of the + * output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceGroupFunction( @@ -43,12 +45,13 @@ class DataSetSlideWindowAggReduceGroupFunction( keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], + finalRowWindowRowtimePos: Option[Int], windowSize: Long) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ protected val windowStartPos: Int = keysAndAggregatesArity private var output: Row = _ @@ -68,10 +71,10 @@ class DataSetSlideWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector( + collector = new DataSetTimeWindowPropertyCollector( finalRowWindowStartPos, finalRowWindowEndPos, - None) + finalRowWindowRowtimePos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { @@ -93,7 +96,10 @@ class DataSetSlideWindowAggReduceGroupFunction( function.setAggregationResults(accumulators, output) // adds TimeWindow properties to output then emit output - if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + if (finalRowWindowStartPos.isDefined || + finalRowWindowEndPos.isDefined || + finalRowWindowRowtimePos.isDefined) { + collector.wrappedCollector = out collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long] collector.windowEnd = collector.windowStart + windowSize http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala index 4a459b2..9eeab33 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala @@ -37,6 +37,8 @@ import org.apache.flink.types.Row * output row * @param windowEndPos The relative window-end field position to the last field of * output row + * @param windowRowtimePos The relative window-rowtime field position to the last field of + * output row * @param keysAndAggregatesArity The total arity of keys and aggregates */ class DataSetTumbleTimeWindowAggReduceCombineFunction( @@ -45,12 +47,14 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( windowSize: Long, windowStartPos: Option[Int], windowEndPos: Option[Int], + windowRowtimePos: Option[Int], keysAndAggregatesArity: Int) extends DataSetTumbleTimeWindowAggReduceGroupFunction( genFinalAggregations, windowSize, windowStartPos, windowEndPos, + windowRowtimePos, keysAndAggregatesArity) with CombineFunction[Row, Row] { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 7ae4c17..4e92148 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -35,6 +35,8 @@ import org.apache.flink.util.Collector * @param windowSize Tumbling time window size * @param windowStartPos The relative window-start field position to the last field of output row * @param windowEndPos The relative window-end field position to the last field of output row + * @param windowRowtimePos The relative window-rowtime field position to the last field of + * output row * @param keysAndAggregatesArity The total arity of keys and aggregates */ class DataSetTumbleTimeWindowAggReduceGroupFunction( @@ -42,12 +44,13 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( windowSize: Long, windowStartPos: Option[Int], windowEndPos: Option[Int], + windowRowtimePos: Option[Int], keysAndAggregatesArity: Int) extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] with Logging { - private var collector: RowTimeWindowPropertyCollector = _ + private var collector: DataSetTimeWindowPropertyCollector = _ protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1) private var output: Row = _ @@ -67,7 +70,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos, None) + collector = new DataSetTimeWindowPropertyCollector( + windowStartPos, + windowEndPos, + windowRowtimePos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala index 3c2e858..818418e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala @@ -42,10 +42,10 @@ class IncrementalAggregateAllTimeWindowFunction( extends IncrementalAggregateAllWindowFunction[TimeWindow]( finalRowArity) { - private var collector: CRowTimeWindowPropertyCollector = _ + private var collector: DataStreamTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector( + collector = new DataStreamTimeWindowPropertyCollector( windowStartOffset, windowEndOffset, windowRowtimeOffset) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index 69e4f7b..a908f49 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -48,10 +48,10 @@ class IncrementalAggregateTimeWindowFunction( numAggregates, finalRowArity) { - private var collector: CRowTimeWindowPropertyCollector = _ + private var collector: DataStreamTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector( + collector = new DataStreamTimeWindowPropertyCollector( windowStartOffset, windowEndOffset, windowRowtimeOffset) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 16e4a0b..e9ecf0f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -40,6 +40,8 @@ abstract class TimeWindowPropertyCollector[T]( def getRow(record: T): Row + def setRowtimeAttribute(pos: Int): Unit + override def collect(record: T): Unit = { output = getRow(record) @@ -57,9 +59,7 @@ abstract class TimeWindowPropertyCollector[T]( } if (windowRowtimeOffset.isDefined) { - output.setField( - lastFieldPos + windowRowtimeOffset.get, - windowEnd - 1) + setRowtimeAttribute(lastFieldPos + windowRowtimeOffset.get) } wrappedCollector.collect(record) @@ -68,20 +68,28 @@ abstract class TimeWindowPropertyCollector[T]( override def close(): Unit = wrappedCollector.close() } -class RowTimeWindowPropertyCollector( +final class DataSetTimeWindowPropertyCollector( startOffset: Option[Int], endOffset: Option[Int], rowtimeOffset: Option[Int]) extends TimeWindowPropertyCollector[Row](startOffset, endOffset, rowtimeOffset) { override def getRow(record: Row): Row = record + + override def setRowtimeAttribute(pos: Int): Unit = { + output.setField(pos, SqlFunctions.internalToTimestamp(windowEnd - 1)) + } } -class CRowTimeWindowPropertyCollector( +final class DataStreamTimeWindowPropertyCollector( startOffset: Option[Int], endOffset: Option[Int], rowtimeOffset: Option[Int]) extends TimeWindowPropertyCollector[CRow](startOffset, endOffset, rowtimeOffset) { override def getRow(record: CRow): Row = record.row + + override def setRowtimeAttribute(pos: Int): Unit = { + output.setField(pos, windowEnd - 1) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 6c6be0b..535fd6e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.validate -import org.apache.calcite.sql.`type`.OperandTypes +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeTransforms} import org.apache.calcite.sql.fun.{SqlGroupFunction, SqlStdOperatorTable} import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} import org.apache.calcite.sql.{SqlFunction, SqlKind, SqlOperator, SqlOperatorTable} @@ -442,7 +442,13 @@ object BasicOperatorTable { val TUMBLE_START: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_START) val TUMBLE_END: SqlGroupFunction = TUMBLE.auxiliary(SqlKind.TUMBLE_END) val TUMBLE_ROWTIME: SqlGroupFunction = - TUMBLE.auxiliary("TUMBLE_ROWTIME", SqlKind.OTHER_FUNCTION) + new SqlGroupFunction( + "TUMBLE_ROWTIME", + SqlKind.OTHER_FUNCTION, + TUMBLE, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + TUMBLE.getOperandTypeChecker) val TUMBLE_PROCTIME: SqlGroupFunction = TUMBLE.auxiliary("TUMBLE_PROCTIME", SqlKind.OTHER_FUNCTION) @@ -461,7 +467,14 @@ object BasicOperatorTable { } val HOP_START: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_START) val HOP_END: SqlGroupFunction = HOP.auxiliary(SqlKind.HOP_END) - val HOP_ROWTIME: SqlGroupFunction = HOP.auxiliary("HOP_ROWTIME", SqlKind.OTHER_FUNCTION) + val HOP_ROWTIME: SqlGroupFunction = + new SqlGroupFunction( + "HOP_ROWTIME", + SqlKind.OTHER_FUNCTION, + HOP, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + HOP.getOperandTypeChecker) val HOP_PROCTIME: SqlGroupFunction = HOP.auxiliary("HOP_PROCTIME", SqlKind.OTHER_FUNCTION) val SESSION: SqlGroupFunction = new SqlGroupFunction( @@ -478,7 +491,13 @@ object BasicOperatorTable { val SESSION_START: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_START) val SESSION_END: SqlGroupFunction = SESSION.auxiliary(SqlKind.SESSION_END) val SESSION_ROWTIME: SqlGroupFunction = - SESSION.auxiliary("SESSION_ROWTIME", SqlKind.OTHER_FUNCTION) + new SqlGroupFunction( + "SESSION_ROWTIME", + SqlKind.OTHER_FUNCTION, + SESSION, + // ensure that returned rowtime is always NOT_NULLABLE + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NOT_NULLABLE), + SESSION.getOperandTypeChecker) val SESSION_PROCTIME: SqlGroupFunction = SESSION.auxiliary("SESSION_PROCTIME", SqlKind.OTHER_FUNCTION) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index cb31866..59aee9f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.api.batch.sql import java.sql.Timestamp import org.apache.flink.api.scala._ -import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical._ @@ -63,6 +62,7 @@ class GroupWindowTest extends TableTestBase { "SELECT " + " TUMBLE_START(ts, INTERVAL '4' MINUTE), " + " TUMBLE_END(ts, INTERVAL '4' MINUTE), " + + " TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE), " + " c, " + " SUM(a) AS sumA, " + " MIN(b) AS minB " + @@ -78,9 +78,10 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "c"), term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, c, sumA, minB") + term("select", "CAST(w$start) AS EXPR$0, CAST(w$end) AS EXPR$1, " + + "w$rowtime AS EXPR$2, c, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -149,6 +150,7 @@ class GroupWindowTest extends TableTestBase { " c, " + " HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + " HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + + " HOP_ROWTIME(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " + " SUM(a) AS sumA, " + " AVG(b) AS avgB " + "FROM T " + @@ -164,9 +166,10 @@ class GroupWindowTest extends TableTestBase { term("window", SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)), term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, sumA, avgB") + term("select", "c, CAST(w$end) AS EXPR$1, CAST(w$start) AS EXPR$2, " + + "w$rowtime AS EXPR$3, sumA, avgB") ) util.verifySql(sqlQuery, expected) @@ -205,6 +208,7 @@ class GroupWindowTest extends TableTestBase { " c, d, " + " SESSION_START(ts, INTERVAL '12' HOUR), " + " SESSION_END(ts, INTERVAL '12' HOUR), " + + " SESSION_ROWTIME(ts, INTERVAL '12' HOUR), " + " SUM(a) AS sumA, " + " MIN(b) AS minB " + "FROM T " + @@ -219,9 +223,10 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "c, d"), term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)), term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " + - "start('w$) AS w$start, end('w$) AS w$end") + "start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), - term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, sumA, minB") + term("select", "c, d, CAST(w$start) AS EXPR$2, CAST(w$end) AS EXPR$3, " + + "w$rowtime AS EXPR$4, sumA, minB") ) util.verifySql(sqlQuery, expected) @@ -250,7 +255,7 @@ class GroupWindowTest extends TableTestBase { ), term("groupBy", "c"), term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), - term("select", "c, start('w$) AS w$start, end('w$) AS w$end") + term("select", "c, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime") ), term("select", "CAST(w$end) AS EXPR$0") ) @@ -288,7 +293,8 @@ class GroupWindowTest extends TableTestBase { "COUNT(*) AS EXPR$0", "SUM(a) AS $f1", "start('w$) AS w$start", - "end('w$) AS w$end") + "end('w$) AS w$end, " + + "rowtime('w$) AS w$rowtime") ), term("select", "EXPR$0", "CAST(w$start) AS EXPR$1"), term("where", http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala index cbf3029..e32b0a9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.batch.sql.validation import java.sql.Timestamp import org.apache.flink.api.scala._ -import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge} +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.utils.TableTestBase @@ -80,21 +80,6 @@ class GroupWindowValidationTest extends TableTestBase { util.verifySql(sql, "n/a") } - @Test(expected = classOf[TableException]) - def testWindowRowtime(): Unit = { - val util = batchTestUtil() - util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts) - - val sqlQuery = - "SELECT " + - " TUMBLE_ROWTIME(ts, INTERVAL '4' MINUTE)" + - "FROM T " + - "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c" - - // should fail because ROWTIME properties are not yet supported in batch - util.verifySql(sqlQuery, "FAIL") - } - @Test(expected = classOf[ValidationException]) def testWindowProctime(): Unit = { val util = batchTestUtil() http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala index 6a2f1a7..ad44e09 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.api.batch.table +import java.sql.Timestamp + import org.apache.flink.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ @@ -146,6 +148,50 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + @Test + def testLongEventTimeTumblingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.hours on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeTumblingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.hours on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", TumblingGroupWindow(WindowReference("w"), 'ts, 2.hours)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + //=============================================================================================== // Sliding Windows //=============================================================================================== @@ -268,6 +314,50 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + @Test + def testLongEventTimeSlidingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Slide over 1.hour every 10.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeSlidingGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Slide over 1.hour every 10.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SlidingGroupWindow(WindowReference("w"), 'ts, 1.hour, 10.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + //=============================================================================================== // Session Windows //=============================================================================================== @@ -315,4 +405,48 @@ class GroupWindowTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + + @Test + def testLongEventTimeSessionGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Session withGap 30.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTimestampEventTimeSessionGroupWindowWithProperties(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Timestamp, Int, String)]('ts, 'int, 'string) + + val windowedTable = table + .window(Session withGap 30.minutes on 'ts as 'w) + .groupBy('w, 'string) + .select('string, 'int.count, 'w.start, 'w.end, 'w.rowtime) + + val expected = unaryNode( + "DataSetWindowAggregate", + batchTableNode(0), + term("groupBy", "string"), + term("window", SessionGroupWindow(WindowReference("w"), 'ts, 30.minutes)), + term("select", "string", "COUNT(int) AS TMP_0", + "start('w) AS TMP_1", "end('w) AS TMP_2", "rowtime('w) AS TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index aa934c6..b105ec02 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -18,8 +18,7 @@ package org.apache.flink.table.runtime.batch.sql -import java.sql.Timestamp - +import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.table.api.TableEnvironment @@ -312,7 +311,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -341,10 +340,10 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) - val result = tEnv.sql(sqlQuery).toDataSet[Row].collect() + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = Seq( "1,{1=1}", "2,{2=1}", "2,{2=1}", @@ -358,6 +357,41 @@ class AggregateITCase( } @Test + def testTumbleWindowWithProperties(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT b, COUNT(a), " + + "TUMBLE_START(ts, INTERVAL '5' SECOND), " + + "TUMBLE_END(ts, INTERVAL '5' SECOND), " + + "TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND)" + + "FROM T " + + "GROUP BY b, TUMBLE(ts, INTERVAL '5' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // min time unit is seconds + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,1,1970-01-01 00:00:00.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,3,1970-01-01 00:00:05.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,4,1970-01-01 00:00:10.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,4,1970-01-01 00:00:15.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + + @Test def testHopWindowAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -365,8 +399,6 @@ class AggregateITCase( tEnv.registerFunction("countFun", new CountAggFunction) tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset) - env.setParallelism(1) - val sqlQuery = "SELECT b, SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), wAvgWithMergeAndReset(a, a)" + "FROM T " + @@ -374,7 +406,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -391,6 +423,48 @@ class AggregateITCase( } @Test + def testHopWindowWithProperties(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT b, COUNT(a), " + + "HOP_START(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " + + "HOP_END(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND), " + + "HOP_ROWTIME(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + + "FROM T " + + "GROUP BY b, HOP(ts, INTERVAL '5' SECOND, INTERVAL '10' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // create timestamps + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "1,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "2,2,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "3,1,1969-12-31 23:59:55.0,1970-01-01 00:00:05.0,1970-01-01 00:00:04.999", + "1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "3,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "4,3,1970-01-01 00:00:00.0,1970-01-01 00:00:10.0,1970-01-01 00:00:09.999", + "3,2,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "4,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "5,4,1970-01-01 00:00:05.0,1970-01-01 00:00:15.0,1970-01-01 00:00:14.999", + "4,1,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "5,5,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "6,4,1970-01-01 00:00:10.0,1970-01-01 00:00:20.0,1970-01-01 00:00:19.999", + "5,1,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999", + "6,6,1970-01-01 00:00:15.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999", + "6,2,1970-01-01 00:00:20.0,1970-01-01 00:00:30.0,1970-01-01 00:00:29.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + + @Test def testSessionWindowAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -398,8 +472,6 @@ class AggregateITCase( tEnv.registerFunction("countFun", new CountAggFunction) tEnv.registerFunction("wAvgWithMergeAndReset", new WeightedAvgWithMergeAndReset) - env.setParallelism(1) - val sqlQuery = "SELECT MIN(a), MAX(a), SUM(a), countFun(c), wAvgWithMergeAndReset(b, a), " + "wAvgWithMergeAndReset(a, a)" + @@ -409,7 +481,7 @@ class AggregateITCase( val ds = CollectionDataSets.get3TupleDataSet(env) // create timestamps .filter(x => (x._2 % 2) == 0) - .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000))) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() @@ -420,4 +492,33 @@ class AggregateITCase( TestBaseUtils.compareResultAsText(result.asJava, expected) } + + @Test + def testSessionWindowWithProperties(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = + "SELECT COUNT(a), " + + "SESSION_START(ts, INTERVAL '4' SECOND), " + + "SESSION_END(ts, INTERVAL '4' SECOND), " + + "SESSION_ROWTIME(ts, INTERVAL '4' SECOND) " + + "FROM T " + + "GROUP BY SESSION(ts, INTERVAL '4' SECOND)" + + val ds = CollectionDataSets.get3TupleDataSet(env) + // create timestamps + .filter(x => (x._2 % 2) == 0) + .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000))) + tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts) + + val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() + val expected = Seq( + "6,1970-01-01 00:00:02.0,1970-01-01 00:00:14.0,1970-01-01 00:00:13.999", + "6,1970-01-01 00:00:16.0,1970-01-01 00:00:25.0,1970-01-01 00:00:24.999" + ).mkString("\n") + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala index 718bc5c..3d9223e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala @@ -22,7 +22,7 @@ import java.math.BigDecimal import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode @@ -101,14 +101,15 @@ class GroupWindowITCase( val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) .groupBy('w, 'string) - .select('string, 'int.sum, 'w.start, 'w.end) + .select('string, 'int.sum, 'w.start, 'w.end, 'w.rowtime) - val expected = "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" + - "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + val expected = + "Hello world,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "Hello world,4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" + + "Hello,7,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "Hello,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "Hallo,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) @@ -126,11 +127,12 @@ class GroupWindowITCase( val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) .groupBy('w) - .select('int.sum, 'w.start, 'w.end) + .select('int.sum, 'w.start, 'w.end, 'w.rowtime) - val expected = "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" + - "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01\n" + - "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02\n" + val expected = + "10,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004\n" + + "6,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "4,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,1970-01-01 00:00:00.019\n" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) @@ -147,15 +149,16 @@ class GroupWindowITCase( val windowedTable = table .window(Session withGap 7.milli on 'long as 'w) .groupBy('string, 'w) - .select('string, 'string.count, 'w.start, 'w.end) + .select('string, 'string.count, 'w.start, 'w.end, 'w.rowtime) val results = windowedTable.toDataSet[Row].collect() - val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" + - "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" + - "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" + - "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" + - "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + val expected = + "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" + + "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015,1970-01-01 00:00:00.014\n" + + "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023,1970-01-01 00:00:00.022\n" + + "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014,1970-01-01 00:00:00.013\n" + + "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008,1970-01-01 00:00:00.007" TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -167,18 +170,20 @@ class GroupWindowITCase( .fromCollection(data) .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) - val results =table + val results = table .window(Session withGap 2.milli on 'long as 'w) .groupBy('w) - .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect() + .select('string.count, 'w.start, 'w.end, 'w.rowtime) + .toDataSet[Row].collect() - val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" + - "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" + - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018" + val expected = + "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006,1970-01-01 00:00:00.005\n" + + "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01,1970-01-01 00:00:00.009\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018,1970-01-01 00:00:00.017" TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[ValidationException]) + @Test def testMultiGroupWindow(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -186,14 +191,24 @@ class GroupWindowITCase( val table = env .fromCollection(data) .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) - table - .window(Tumble over 5.milli on 'long as 'w) + + val results = table + .window(Tumble over 2.milli on 'long as 'w) .groupBy('w, 'string) - .select('string, 'int.count) - .window( Slide over 5.milli every 1.milli on 'int as 'w2) - .groupBy('w2) - .select('string) - .toDataSet[Row] + .select('string, 'int.count as 'cnt, 'w.rowtime as 'time) + .window(Tumble over 6.milli on 'time as 'w2) + .groupBy('w2, 'string) + .select('string, 'cnt.sum as 'cnt, 'w2.end) + .toDataSet[Row].collect() + + val expected = + "Hallo,1,1970-01-01 00:00:00.006\n" + + "Hello world,1,1970-01-01 00:00:00.012\n" + + "Hello world,1,1970-01-01 00:00:00.018\n" + + "Hello,1,1970-01-01 00:00:00.012\n" + + "Hello,2,1970-01-01 00:00:00.006\n" + + "Hi,1,1970-01-01 00:00:00.006\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) } // ---------------------------------------------------------------------------------------------- @@ -230,18 +245,18 @@ class GroupWindowITCase( val windowedTable = table .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) - .select('int.count, 'w.start, 'w.end) + .select('int.count, 'w.start, 'w.end, 'w.rowtime) val expected = - "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" + - "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" + - "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" + - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" + - "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" + - "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" + - "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" + - "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" + - "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005" + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012\n" + + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016\n" + + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02\n" + + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002\n" + + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01\n" + + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006\n" + + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008\n" + + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 32e3724..c49af5c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -418,7 +418,9 @@ class SqlITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row] + val resultHopStartEndWithHaving = tEnv + .sqlQuery(sqlQueryHopStartEndWithHaving) + .toAppendStream[Row] resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row]) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/babee277/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index a9d4e44..1eebeee 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -254,25 +254,25 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { val windowedTable = table .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) - .select('int.count, 'w.start, 'w.end) + .select('int.count, 'w.start, 'w.end, 'w.rowtime) val results = windowedTable.toAppendStream[Row] results.addSink(new StreamITCase.StringSink[Row]) env.execute() val expected = Seq( - "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013", - "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017", - "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019", - "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021", - "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003", - "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011", - "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007", - "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", - "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", - "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033", - "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035", - "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037") + "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013,1970-01-01 00:00:00.012", + "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017,1970-01-01 00:00:00.016", + "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019,1970-01-01 00:00:00.018", + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021,1970-01-01 00:00:00.02", + "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003,1970-01-01 00:00:00.002", + "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011,1970-01-01 00:00:00.01", + "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007,1970-01-01 00:00:00.006", + "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009,1970-01-01 00:00:00.008", + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1970-01-01 00:00:00.004", + "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033,1970-01-01 00:00:00.032", + "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035,1970-01-01 00:00:00.034", + "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037,1970-01-01 00:00:00.036") assertEquals(expected.sorted, StreamITCase.testResults.sorted) }
