[FLINK-6392] [table] Improve group window API to make alias mandatory. This closes #3786.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb972b85 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb972b85 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb972b85 Branch: refs/heads/master Commit: bb972b85a76d9b98ca5e90033e448f68f9109485 Parents: 69905e1 Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Thu Apr 27 09:58:18 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Apr 27 17:04:00 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/table/api/java/windows.scala | 18 +- .../apache/flink/table/api/scala/windows.scala | 18 +- .../org/apache/flink/table/api/table.scala | 5 +- .../org/apache/flink/table/api/windows.scala | 297 ++++++++++++------- .../table/plan/logical/LogicalWindow.scala | 7 +- .../flink/table/plan/logical/groupWindows.scala | 28 +- .../flink/table/plan/logical/operators.scala | 2 +- .../common/WindowStartEndPropertiesRule.scala | 4 +- .../scala/batch/sql/WindowAggregateTest.scala | 12 +- .../scala/batch/table/FieldProjectionTest.scala | 4 +- .../api/scala/batch/table/GroupWindowTest.scala | 18 +- .../scala/stream/sql/WindowAggregateTest.scala | 6 +- .../scala/stream/table/GroupWindowTest.scala | 48 +-- 13 files changed, 274 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala index 1e9d65a..15208ce 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.api.java -import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, SlideWithSize, TumblingWindow} +import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap} import org.apache.flink.table.expressions.{Expression, ExpressionParser} /** @@ -34,9 +34,9 @@ object Tumble { * elements in 5 minutes intervals. * * @param size the size of the window as time or row-count interval. - * @return a tumbling window + * @return a partially defined tumbling window */ - def over(size: String): TumblingWindow = new TumblingWindow(size) + def over(size: String): TumbleWithSize = new TumbleWithSize(size) } /** @@ -79,9 +79,9 @@ object Session { * * @param gap specifies how long (as interval of milliseconds) to wait for new data before * closing the session window. - * @return a session window + * @return a partially defined session window */ - def withGap(gap: String): SessionWindow = new SessionWindow(gap) + def withGap(gap: String): SessionWithGap = new SessionWithGap(gap) } /** @@ -96,9 +96,9 @@ object Over { * * For batch tables, refer to a timestamp or long attribute. */ - def orderBy(orderBy: String): OverWindowPredefined = { + def orderBy(orderBy: String): OverWindowWithOrderBy = { val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowPredefined(Seq[Expression](), orderByExpr) + new OverWindowWithOrderBy(Seq[Expression](), orderByExpr) } /** @@ -122,8 +122,8 @@ class PartitionedOver(private val partitionByExpr: Array[Expression]) { * * For batch tables, refer to a timestamp or long attribute. */ - def orderBy(orderBy: String): OverWindowPredefined = { + def orderBy(orderBy: String): OverWindowWithOrderBy = { val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowPredefined(partitionByExpr, orderByExpr) + new OverWindowWithOrderBy(partitionByExpr, orderByExpr) } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 1230027..5e70440 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.api.{OverWindowPredefined, SessionWindow, SlideWithSize, TumblingWindow} +import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap} /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -34,9 +34,9 @@ object Tumble { * elements in 5 minutes intervals. * * @param size the size of the window as time or row-count interval. - * @return a tumbling window + * @return a partially defined tumbling window */ - def over(size: Expression): TumblingWindow = new TumblingWindow(size) + def over(size: Expression): TumbleWithSize = new TumbleWithSize(size) } /** @@ -79,9 +79,9 @@ object Session { * * @param gap specifies how long (as interval of milliseconds) to wait for new data before * closing the session window. - * @return a session window + * @return a partially defined session window */ - def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) + def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap) } /** @@ -96,8 +96,8 @@ object Over { * * For batch tables, refer to a timestamp or long attribute. */ - def orderBy(orderBy: Expression): OverWindowPredefined = { - new OverWindowPredefined(Seq[Expression](), orderBy) + def orderBy(orderBy: Expression): OverWindowWithOrderBy = { + new OverWindowWithOrderBy(Seq[Expression](), orderBy) } /** @@ -120,7 +120,7 @@ case class PartitionedOver(partitionBy: Array[Expression]) { * * For batch tables, refer to a timestamp or long attribute. */ - def orderBy(orderBy: Expression): OverWindowPredefined = { - new OverWindowPredefined(partitionBy, orderBy) + def orderBy(orderBy: Expression): OverWindowWithOrderBy = { + new OverWindowWithOrderBy(partitionBy, orderBy) } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index c688aab..0953611 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -805,9 +805,6 @@ class Table( * @return A windowed table. */ def window(window: Window): WindowedTable = { - if (window.alias.isEmpty) { - throw new ValidationException("An alias must be specified for the window.") - } new WindowedTable(this, window) } @@ -936,7 +933,7 @@ class WindowedTable( * }}} */ def groupBy(fields: Expression*): WindowGroupedTable = { - val fieldsWithoutWindow = fields.filterNot(window.alias.get.equals(_)) + val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_)) if (fields.size != fieldsWithoutWindow.size + 1) { throw new ValidationException("GroupBy must contain exactly one window alias.") } http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala index 577d77c..80260f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala @@ -35,32 +35,40 @@ case class OverWindow( case class CurrentRow() extends Expression { override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS + override private[flink] def children = Seq() + override def toString = "CURRENT ROW" } case class CurrentRange() extends Expression { override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS + override private[flink] def children = Seq() + override def toString = "CURRENT RANGE" } case class UnboundedRow() extends Expression { override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS + override private[flink] def children = Seq() + override def toString = "UNBOUNDED ROW" } case class UnboundedRange() extends Expression { override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS + override private[flink] def children = Seq() + override def toString = "UNBOUNDED RANGE" } /** - * An over window predefined specification. + * A partially defined over window. */ -class OverWindowPredefined( +class OverWindowWithOrderBy( private val partitionBy: Seq[Expression], private val orderBy: Expression) { @@ -100,7 +108,7 @@ class OverWindowPredefined( * @param preceding preceding offset relative to the current row. * @return this over window */ - def preceding(preceding: String): OverWindowPredefined = { + def preceding(preceding: String): OverWindowWithOrderBy = { this.preceding(ExpressionParser.parseExpression(preceding)) } @@ -110,7 +118,7 @@ class OverWindowPredefined( * @param preceding preceding offset relative to the current row. * @return this over window */ - def preceding(preceding: Expression): OverWindowPredefined = { + def preceding(preceding: Expression): OverWindowWithOrderBy = { this.preceding = preceding this } @@ -121,7 +129,7 @@ class OverWindowPredefined( * @param following following offset that relative to the current row. * @return this over window */ - def following(following: String): OverWindowPredefined = { + def following(following: String): OverWindowWithOrderBy = { this.following(ExpressionParser.parseExpression(following)) } @@ -131,7 +139,7 @@ class OverWindowPredefined( * @param following following offset that relative to the current row. * @return this over window */ - def following(following: Expression): OverWindowPredefined = { + def following(following: Expression): OverWindowWithOrderBy = { this.following = following this } @@ -149,15 +157,20 @@ class OverWindowPredefined( * * For finite batch tables, window provides shortcuts for time-based groupBy. * + * @param alias The expression of alias for this Window */ -abstract class Window { +abstract class Window(val alias: Expression) { - // The expression of alias for this Window - private[flink] var alias: Option[Expression] = None /** * Converts an API class to a logical window for planning. */ private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A window specification without alias. + */ +abstract class WindowWithoutAlias { /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can @@ -166,10 +179,7 @@ abstract class Window { * @param alias alias for this window * @return this window */ - def as(alias: Expression): Window = { - this.alias = Some(alias) - this - } + def as(alias: Expression): Window /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can @@ -182,36 +192,22 @@ abstract class Window { } /** - * A window operating on event-time. - * - * @param timeField defines the time mode for streaming tables. For batch table it defines the - * time attribute on which is grouped. - */ -abstract class EventTimeWindow(val timeField: Expression) extends Window - -// ------------------------------------------------------------------------------------------------ -// Tumbling windows -// ------------------------------------------------------------------------------------------------ - -/** - * Tumbling window. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are - * grouped by processing-time. - * - * @param size the size of the window either as time or row-count interval. + * A predefined specification of window on processing-time */ -class TumblingWindow(size: Expression) extends Window { +abstract class ProcTimeWindowWithoutAlias extends WindowWithoutAlias { /** - * Tumbling window. + * Specifies the time attribute on which rows are grouped. * * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows * are grouped by processing-time. * - * @param size the size of the window either as time or row-count interval. + * For batch tables, refer to a timestamp or long attribute. + * + * @param timeField time mode for streaming tables and time attribute for batch tables + * @return a predefined window on event-time */ - def this(size: String) = this(ExpressionParser.parseExpression(size)) + def on(timeField: Expression): WindowWithoutAlias /** * Specifies the time attribute on which rows are grouped. @@ -222,24 +218,85 @@ class TumblingWindow(size: Expression) extends Window { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a tumbling window on event-time + * @return a predefined window on event-time */ - def on(timeField: Expression): TumblingEventTimeWindow = - new TumblingEventTimeWindow(timeField, size) + def on(timeField: String): WindowWithoutAlias = + on(ExpressionParser.parseExpression(timeField)) +} + +/** + * A window operating on event-time. + * + * For streaming tables call on('rowtime) to specify grouping by event-time. + * Otherwise rows are grouped by processing-time. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param timeField time mode for streaming tables and time attribute for batch tables + */ +abstract class EventTimeWindow(alias: Expression, val timeField: Expression) extends Window(alias) + +// ------------------------------------------------------------------------------------------------ +// Tumbling windows +// ------------------------------------------------------------------------------------------------ + +/** + * A partial specification of a tumbling window. + * + * @param size the size of the window either a time or a row-count interval. + */ +class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias { + + def this(size: String) = this(ExpressionParser.parseExpression(size)) /** * Specifies the time attribute on which rows are grouped. * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. + * Otherwise rows are grouped by processing-time. * * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a tumbling window on event-time + * @return a predefined window on event-time */ - def on(timeField: String): TumblingEventTimeWindow = - on(ExpressionParser.parseExpression(timeField)) + override def on(timeField: Expression): WindowWithoutAlias = + new TumbleWithoutAlias(timeField, size) + + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. + * + * @param alias alias for this window + * @return this window + */ + override def as(alias: Expression) = new TumblingWindow(alias, size) +} + +/** + * A tumbling window on event-time without alias. + */ +class TumbleWithoutAlias( + time: Expression, + size: Expression) extends WindowWithoutAlias { + + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. + * + * @param alias alias for this window + * @return this window + */ + override def as(alias: Expression): Window = new TumblingEventTimeWindow(alias, time, size) +} + +/** + * Tumbling window on processing-time. + * + * @param alias the alias of the window. + * @param size the size of the window either a time or a row-count interval. + */ +class TumblingWindow(alias: Expression, size: Expression) extends Window(alias) { override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeTumblingGroupWindow(alias, size) @@ -249,9 +306,9 @@ class TumblingWindow(size: Expression) extends Window { * Tumbling window on event-time. */ class TumblingEventTimeWindow( + alias: Expression, time: Expression, - size: Expression) - extends EventTimeWindow(time) { + size: Expression) extends EventTimeWindow(alias, time) { override private[flink] def toLogicalWindow: LogicalWindow = EventTimeTumblingGroupWindow(alias, time, size) @@ -262,16 +319,16 @@ class TumblingEventTimeWindow( // ------------------------------------------------------------------------------------------------ /** - * Partially specified sliding window. + * A partially specified sliding window. * - * @param size the size of the window either as time or row-count interval. + * @param size the size of the window either a time or a row-count interval. */ class SlideWithSize(size: Expression) { /** - * Partially specified sliding window. + * A partially specified sliding window. * - * @param size the size of the window either as time or row-count interval. + * @param size the size of the window either a time or a row-count interval. */ def this(size: String) = this(ExpressionParser.parseExpression(size)) @@ -286,9 +343,9 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a sliding window + * @return a predefined sliding window. */ - def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide) + def every(slide: Expression): SlideWithSlide = new SlideWithSlide(size, slide) /** * Specifies the window's slide as time or row-count interval. @@ -301,24 +358,17 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a sliding window + * @return a predefined sliding window. */ - def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide)) + def every(slide: String): WindowWithoutAlias = every(ExpressionParser.parseExpression(slide)) } /** - * Sliding window. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are - * grouped by processing-time. - * - * @param size the size of the window either as time or row-count interval. + * A partially defined sliding window. */ -class SlidingWindow( +class SlideWithSlide( size: Expression, - slide: Expression) - extends Window { - + slide: Expression) extends ProcTimeWindowWithoutAlias { /** * Specifies the time attribute on which rows are grouped. * @@ -328,37 +378,65 @@ class SlidingWindow( * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a sliding window on event-time + * @return a predefined Sliding window on event-time. */ - def on(timeField: Expression): SlidingEventTimeWindow = - new SlidingEventTimeWindow(timeField, size, slide) + override def on(timeField: Expression): SlideWithoutAlias = + new SlideWithoutAlias(timeField, size, slide) /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * - * For batch tables, refer to a timestamp or long attribute. + * @param alias alias for this window + * @return this window + */ + override def as(alias: Expression): Window = new SlidingWindow(alias, size, slide) +} + +/** + * A partially defined sliding window on event-time without alias. + */ +class SlideWithoutAlias( + timeField: Expression, + size: Expression, + slide: Expression) extends WindowWithoutAlias { + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a sliding window on event-time + * @param alias alias for this window + * @return this window */ - def on(timeField: String): SlidingEventTimeWindow = - on(ExpressionParser.parseExpression(timeField)) + override def as(alias: Expression): Window = + new SlidingEventTimeWindow(alias, timeField, size, slide) +} + +/** + * A sliding window on processing-time. + * + * @param alias the alias of the window. + * @param size the size of the window either a time or a row-count interval. + * @param slide the interval by which the window slides. + */ +class SlidingWindow( + alias: Expression, + size: Expression, + slide: Expression) + extends Window(alias) { override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeSlidingGroupWindow(alias, size, slide) } /** - * Sliding window on event-time. + * A sliding window on event-time. */ class SlidingEventTimeWindow( + alias: Expression, timeField: Expression, size: Expression, slide: Expression) - extends EventTimeWindow(timeField) { + extends EventTimeWindow(alias, timeField) { override private[flink] def toLogicalWindow: LogicalWindow = EventTimeSlidingGroupWindow(alias, timeField, size, slide) @@ -369,23 +447,10 @@ class SlidingEventTimeWindow( // ------------------------------------------------------------------------------------------------ /** - * Session window. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are - * grouped by processing-time. - * - * @param gap the time interval of inactivity before a window is closed. + * A partially defined session window. */ -class SessionWindow(gap: Expression) extends Window { +class SessionWithGap(gap: Expression) extends ProcTimeWindowWithoutAlias { - /** - * Session window. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. - * - * @param gap the time interval of inactivity before a window is closed. - */ def this(gap: String) = this(ExpressionParser.parseExpression(gap)) /** @@ -397,36 +462,56 @@ class SessionWindow(gap: Expression) extends Window { * For batch tables, refer to a timestamp or long attribute. * * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a session window on event-time + * @return an on event-time session window on event-time */ - def on(timeField: Expression): SessionEventTimeWindow = - new SessionEventTimeWindow(timeField, gap) + override def on(timeField: Expression): SessionWithoutAlias = + new SessionWithoutAlias(timeField, gap) /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * - * For batch tables, refer to a timestamp or long attribute. + * @param alias alias for this window + * @return this window + */ + override def as(alias: Expression): Window = new SessionWindow(alias, gap) +} + +/** + * A partially defined session window on event-time without alias. + */ +class SessionWithoutAlias( + timeField: Expression, + gap: Expression) extends WindowWithoutAlias { + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a session window on event-time + * @param alias alias for this window + * @return this window */ - def on(timeField: String): SessionEventTimeWindow = - on(ExpressionParser.parseExpression(timeField)) + override def as(alias: Expression): Window = new SessionEventTimeWindow(alias, timeField, gap) +} + +/** + * A session window on processing-time. + * + * @param gap the time interval of inactivity before a window is closed. + */ +class SessionWindow(alias: Expression, gap: Expression) extends Window(alias) { override private[flink] def toLogicalWindow: LogicalWindow = ProcessingTimeSessionGroupWindow(alias, gap) } /** - * Session window on event-time. + * A session window on event-time. */ class SessionEventTimeWindow( + alias: Expression, timeField: Expression, gap: Expression) - extends EventTimeWindow(timeField) { + extends EventTimeWindow(alias, timeField) { override private[flink] def toLogicalWindow: LogicalWindow = EventTimeSessionGroupWindow(alias, timeField, gap) http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala index 1264566..1884e54 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala @@ -22,14 +22,13 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.expressions.{Expression, WindowReference} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} -abstract class LogicalWindow(val alias: Option[Expression]) extends Resolvable[LogicalWindow] { +abstract class LogicalWindow(val alias: Expression) extends Resolvable[LogicalWindow] { def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this def validate(tableEnv: TableEnvironment): ValidationResult = alias match { - case Some(WindowReference(_)) => ValidationSuccess - case Some(_) => ValidationFailure("Window reference for window expected.") - case None => ValidationSuccess + case WindowReference(_) => ValidationSuccess + case _ => ValidationFailure("Window reference for window expected.") } override def toString: String = getClass.getSimpleName http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala index f1f058e..576756d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeIn import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} abstract class EventTimeGroupWindow( - alias: Option[Expression], + alias: Expression, time: Expression) extends LogicalWindow(alias) { @@ -55,7 +55,7 @@ abstract class EventTimeGroupWindow( } } -abstract class ProcessingTimeGroupWindow(alias: Option[Expression]) extends LogicalWindow(alias) { +abstract class ProcessingTimeGroupWindow(alias: Expression) extends LogicalWindow(alias) { override def validate(tableEnv: TableEnvironment): ValidationResult = { val valid = super.validate(tableEnv) if (valid.isFailure) { @@ -88,13 +88,13 @@ object TumblingGroupWindow { } case class ProcessingTimeTumblingGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, size: Expression) extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeTumblingGroupWindow( - alias.map(resolve), + resolve(alias), resolve(size)) override def validate(tableEnv: TableEnvironment): ValidationResult = @@ -104,7 +104,7 @@ case class ProcessingTimeTumblingGroupWindow( } case class EventTimeTumblingGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, timeField: Expression, size: Expression) extends EventTimeGroupWindow( @@ -113,7 +113,7 @@ case class EventTimeTumblingGroupWindow( override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeTumblingGroupWindow( - alias.map(resolve), + resolve(alias), resolve(timeField), resolve(size)) @@ -177,14 +177,14 @@ object SlidingGroupWindow { } case class ProcessingTimeSlidingGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, size: Expression, slide: Expression) extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeSlidingGroupWindow( - alias.map(resolve), + resolve(alias), resolve(size), resolve(slide)) @@ -195,7 +195,7 @@ case class ProcessingTimeSlidingGroupWindow( } case class EventTimeSlidingGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, timeField: Expression, size: Expression, slide: Expression) @@ -203,7 +203,7 @@ case class EventTimeSlidingGroupWindow( override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeSlidingGroupWindow( - alias.map(resolve), + resolve(alias), resolve(timeField), resolve(size), resolve(slide)) @@ -240,13 +240,13 @@ object SessionGroupWindow { } case class ProcessingTimeSessionGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, gap: Expression) extends ProcessingTimeGroupWindow(alias) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = ProcessingTimeSessionGroupWindow( - alias.map(resolve), + resolve(alias), resolve(gap)) override def validate(tableEnv: TableEnvironment): ValidationResult = @@ -256,7 +256,7 @@ case class ProcessingTimeSessionGroupWindow( } case class EventTimeSessionGroupWindow( - override val alias: Option[Expression], + override val alias: Expression, timeField: Expression, gap: Expression) extends EventTimeGroupWindow( @@ -265,7 +265,7 @@ case class EventTimeSessionGroupWindow( override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeSessionGroupWindow( - alias.map(resolve), + resolve(alias), resolve(timeField), resolve(gap)) http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 559bd75..f1bb644 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -568,7 +568,7 @@ case class WindowAggregate( case _ => window.alias match { // resolve reference to this window's alias - case Some(UnresolvedFieldReference(alias)) if name == alias => + case UnresolvedFieldReference(alias) if name == alias => // check if reference can already be resolved by input fields val found = super.resolveReference(tableEnv, name) if (found.isDefined) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index c68da04..4da2da9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -53,8 +53,8 @@ class WindowStartEndPropertiesRule transformed.push(LogicalWindowAggregate.create( agg.getWindow, Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get)) + NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias)) ), agg) ) http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala index cd6c77d..9d16536 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala @@ -46,7 +46,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("select", "ts, a, b") ), - term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 7200000.millis)), + term("window", EventTimeTumblingGroupWindow('w$, 'ts, 7200000.millis)), term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) @@ -75,7 +75,7 @@ class WindowAggregateTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "c"), - term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 240000.millis)), + term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), @@ -104,7 +104,7 @@ class WindowAggregateTest extends TableTestBase { term("select", "ts, a, b") ), term("window", - EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 900000.millis)), + EventTimeSlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)), term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) @@ -134,7 +134,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("groupBy", "c, d"), term("window", - EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 3600000.millis)), + EventTimeSlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)), term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " + "start('w$) AS w$start, end('w$) AS w$end") ), @@ -160,7 +160,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("select", "ts") ), - term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 1800000.millis)), + term("window", EventTimeSessionGroupWindow('w$, 'ts, 1800000.millis)), term("select", "COUNT(*) AS cnt") ) @@ -189,7 +189,7 @@ class WindowAggregateTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "c, d"), - term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 43200000.millis)), + term("window", EventTimeSessionGroupWindow('w$, 'ts, 43200000.millis)), term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index 4d0d9aa..6ebfec0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -239,7 +239,7 @@ class FieldProjectionTest extends TableTestBase { ), term("window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.millis)), term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1") @@ -268,7 +268,7 @@ class FieldProjectionTest extends TableTestBase { term("groupBy", "b"), term("window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.millis)), term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1") http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala index a778b40..8f4eea5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala @@ -97,7 +97,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 2.rows)), + term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -118,7 +118,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 5.milli)), + term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -164,7 +164,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("select", "int", "long") ), - term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 5.milli)), + term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -188,7 +188,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("select", "int", "long") ), - term("window", EventTimeTumblingGroupWindow(Some(WindowReference("w")), 'long, 2.rows)), + term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -236,7 +236,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 8.milli, 10.milli)), + EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -258,7 +258,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 1.rows)), + EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -294,7 +294,7 @@ class GroupWindowTest extends TableTestBase { term("select", "int", "long") ), term("window", - EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 8.milli, 10.milli)), + EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -319,7 +319,7 @@ class GroupWindowTest extends TableTestBase { term("select", "int", "long") ), term("window", - EventTimeSlidingGroupWindow(Some(WindowReference("w")), 'long, 2.rows, 1.rows)), + EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -344,7 +344,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeSessionGroupWindow(Some(WindowReference("w")), 'long, 7.milli)), + term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 324b4d6..578a6a8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -102,7 +102,7 @@ class WindowAggregateTest extends TableTestBase { streamTableNode(0), term("select", "1970-01-01 00:00:00 AS $f0") ), - term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 900000.millis)), + term("window", EventTimeTumblingGroupWindow('w$, 'rowtime, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end") ), term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end") @@ -128,7 +128,7 @@ class WindowAggregateTest extends TableTestBase { streamTableNode(0), term("select", "1970-01-01 00:00:00 AS $f0") ), - term("window", ProcessingTimeSlidingGroupWindow(Some('w$), + term("window", ProcessingTimeSlidingGroupWindow('w$, 3600000.millis, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end") ), @@ -156,7 +156,7 @@ class WindowAggregateTest extends TableTestBase { streamTableNode(0), term("select", "1970-01-01 00:00:00 AS $f0") ), - term("window", ProcessingTimeSessionGroupWindow(Some('w$), 900000.millis)), + term("window", ProcessingTimeSessionGroupWindow('w$, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end") ), term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end") http://git-wip-us.apache.org/repos/asf/flink/blob/bb972b85/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index fde7682..402ebbe 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -195,7 +195,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeTumblingGroupWindow( - Some(WindowReference("w1")), + WindowReference("w1"), 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ), @@ -204,7 +204,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeSlidingGroupWindow( - Some(WindowReference("w2")), + WindowReference("w2"), 20.milli, 10.milli)), term("select", "COUNT(string) AS TMP_1") ) @@ -232,7 +232,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -261,7 +261,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeTumblingGroupWindow( - Some(WindowReference("w")), 2.rows)), + WindowReference("w"), 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -289,7 +289,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.milli)), term("select", "string", "COUNT(int) AS TMP_0") @@ -316,7 +316,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -345,7 +345,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 50.milli, 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -374,7 +374,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -403,7 +403,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 8.milli, 10.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -429,7 +429,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -458,7 +458,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSessionGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -487,7 +487,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -515,7 +515,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -543,7 +543,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -572,7 +572,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -601,7 +601,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 50.milli, 50.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -629,7 +629,7 @@ class GroupWindowTest extends TableTestBase { term( "window", ProcessingTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -657,7 +657,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 8.milli, 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -686,7 +686,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -714,7 +714,7 @@ class GroupWindowTest extends TableTestBase { term( "window", EventTimeSessionGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 7.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -742,7 +742,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term("window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.milli)), term("select", @@ -775,7 +775,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term("window", EventTimeSlidingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 10.milli, 5.milli)), @@ -811,7 +811,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term("window", EventTimeSessionGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 3.milli)), term("select", @@ -849,7 +849,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term("window", EventTimeTumblingGroupWindow( - Some(WindowReference("w")), + WindowReference("w"), RowtimeAttribute(), 5.millis)), term("select",