This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new 44c2474 [FLINK-11918][table] Deprecated some Window APIs and Rename Window to GroupWindow 44c2474 is described below commit 44c24749998120695e43e7dbea3aa7b6bc8b00bf Author: hequn8128 <chenghe...@gmail.com> AuthorDate: Thu Mar 14 18:06:19 2019 +0800 [FLINK-11918][table] Deprecated some Window APIs and Rename Window to GroupWindow This closes #7985 --- docs/dev/table/tableApi.md | 14 +- .../org/apache/flink/table/api/java/windows.scala | 88 ++------- .../org/apache/flink/table/api/scala/windows.scala | 90 +++------ .../scala/org/apache/flink/table/api/table.scala | 57 +++++- .../scala/org/apache/flink/table/api/windows.scala | 217 ++++++++++++++++++++- .../table/api/batch/table/GroupWindowTest.scala | 1 + .../validation/GroupWindowValidationTest.scala | 2 +- .../validation/OverWindowValidationTest.scala | 2 +- .../table/api/stream/table/AggregateTest.scala | 1 + .../flink/table/api/stream/table/CalcTest.scala | 1 + .../table/api/stream/table/GroupWindowTest.scala | 1 + .../table/api/stream/table/OverWindowTest.scala | 2 +- .../table/api/stream/table/TableSourceTest.scala | 2 +- .../stringexpr/AggregateStringExpressionTest.scala | 6 +- .../GroupWindowStringExpressionTest.scala | 14 +- .../OverWindowStringExpressionTest.scala | 48 ++--- .../table/validation/CalcValidationTest.scala | 4 +- .../validation/GroupWindowValidationTest.scala | 2 +- .../validation/OverWindowValidationTest.scala | 2 +- .../flink/table/plan/RetractionRulesTest.scala | 2 +- .../table/plan/TimeIndicatorConversionTest.scala | 1 + .../flink/table/plan/UpdatingPlanCheckerTest.scala | 2 +- .../runtime/batch/table/GroupWindowITCase.scala | 1 + .../runtime/batch/table/TableSourceITCase.scala | 2 +- .../runtime/stream/TimeAttributesITCase.scala | 2 +- .../runtime/stream/table/GroupWindowITCase.scala | 2 +- .../table/runtime/stream/table/JoinITCase.scala | 2 +- .../runtime/stream/table/OverWindowITCase.scala | 1 + .../runtime/stream/table/TableSinkITCase.scala | 2 +- .../runtime/stream/table/TableSourceITCase.scala | 2 +- 30 files changed, 376 insertions(+), 197 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index a509d85..f4c74cc 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1271,14 +1271,14 @@ orders.insertInto("OutOrders") Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals. -Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. +Windows are defined using the `window(w: GroupWindow)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. The following example shows how to define a window aggregation on a table. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w") // group the table by window w .select("b.sum"); // aggregate {% endhighlight %} @@ -1287,7 +1287,7 @@ Table table = input <div data-lang="scala" markdown="1"> {% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w) // group the table by window w .select('b.sum) // aggregate {% endhighlight %} @@ -1301,7 +1301,7 @@ The following example shows how to define a window aggregation with additional g <div data-lang="java" markdown="1"> {% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, b.sum"); // aggregate {% endhighlight %} @@ -1310,7 +1310,7 @@ Table table = input <div data-lang="scala" markdown="1"> {% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w .select('a, 'b.sum) // aggregate {% endhighlight %} @@ -1323,7 +1323,7 @@ Window properties such as the start, end, or rowtime timestamp of a time window <div data-lang="java" markdown="1"> {% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} @@ -1332,7 +1332,7 @@ Table table = input <div data-lang="scala" markdown="1"> {% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala index 121aab8..8cfeca8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -19,26 +19,19 @@ package org.apache.flink.table.api.java import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE} -import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} +import org.apache.flink.table.api.{OverWindow, OverWindowWithPreceding} import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.api._ /** * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups * elements in 5 minutes intervals. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]]. */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping - * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a partially defined tumbling window - */ - def over(size: String): TumbleWithSize = new TumbleWithSize(size) -} +@Deprecated +object Tumble extends TumbleBase /** * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by @@ -48,71 +41,30 @@ object Tumble { * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive * window evaluations. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]]. */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive window evaluations. - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: String): SlideWithSize = new SlideWithSize(size) -} +@Deprecated +object Slide extends SlideBase /** * Helper class for creating a session window. The boundary of session windows are defined by * intervals of inactivity, i.e., a session window is closes if no event appears for a defined * gap period. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]]. */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a partially defined session window - */ - def withGap(gap: String): SessionWithGap = new SessionWithGap(gap) -} +@Deprecated +object Session extends SessionBase /** - * Helper object for creating a over window. + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]]. */ -object Over { - - /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. - * - * For batch tables, refer to a timestamp or long attribute. - */ - def orderBy(orderBy: String): OverWindowWithOrderBy = { - val orderByExpr = ExpressionParser.parseExpression(orderBy) - new OverWindowWithOrderBy(Array[Expression](), orderByExpr) - } - - /** - * Partitions the elements on some partition keys. - * - * @param partitionBy some partition keys. - * @return A partitionedOver instance that only contains the orderBy method. - */ - def partitionBy(partitionBy: String): PartitionedOver = { - val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray - new PartitionedOver(partitionByExpr) - } -} +@Deprecated +object Over extends OverBase class PartitionedOver(private val partitionByExpr: Array[Expression]) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 2f88248..1172822 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -18,26 +18,20 @@ package org.apache.flink.table.api.scala -import org.apache.flink.table.api.{OverWindow, TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap} +import org.apache.flink.table.api.{OverWindow, OverWindowWithPreceding} import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.api._ /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups * elements in 5 minutes intervals. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]]. */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping - * windows. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a partially defined tumbling window - */ - def over(size: Expression): TumbleWithSize = new TumbleWithSize(size) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Tumble.", "1.8") +object Tumble extends TumbleBase /** * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by @@ -47,69 +41,33 @@ object Tumble { * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive * window evaluations. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]]. */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: Expression): SlideWithSize = new SlideWithSize(size) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Slide.", "1.8") +object Slide extends SlideBase /** * Helper object for creating a session window. The boundary of session windows are defined by * intervals of inactivity, i.e., a session window is closes if no event appears for a defined * gap period. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]]. */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a partially defined session window - */ - def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Session.", "1.8") +object Session extends SessionBase /** - * Helper object for creating a over window. + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]]. */ -object Over { - - /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode. - * - * For batch tables, refer to a timestamp or long attribute. - */ - def orderBy(orderBy: Expression): OverWindowWithOrderBy = { - new OverWindowWithOrderBy(Seq[Expression](), orderBy) - } - - /** - * Partitions the elements on some partition keys. - * - * @param partitionBy some partition keys. - * @return A partitionedOver instance that only contains the orderBy method. - */ - def partitionBy(partitionBy: Expression*): PartitionedOver = { - PartitionedOver(partitionBy.toArray) - } -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Over.", "1.8") +object Over extends OverBase case class PartitionedOver(partitionBy: Array[Expression]) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala index e2f1161..4e7b918 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala @@ -1165,12 +1165,40 @@ class Table( * * @param window window that specifies how elements are grouped. * @return A windowed table. + * + * @deprecated Will be removed in a future release. Please use Table.window(window: GroupWindow) + * instead. */ + @deprecated( + "This method will be removed. Please use Table.window(window: GroupWindow) instead.", + "1.8") + @Deprecated def window(window: Window): WindowedTable = { new WindowedTable(this, window) } /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + * For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which group-based aggregates can be computed. + * + * For batch tables of finite size, windowing essentially provides shortcuts for time-based + * groupBy. + * + * __Note__: Computing windowed aggregates on a streaming table is only a parallel operation + * if additional grouping attributes are added to the `groupBy(...)` clause. + * If the `groupBy(...)` only references a window alias, the streamed table will be processed + * by a single task, i.e., with parallelism 1. + * + * @param window group window that specifies how elements are grouped. + * @return A group windowed table. + */ + def window(window: GroupWindow): GroupWindowedTable = { + new GroupWindowedTable(this, window) + } + + /** * Defines over-windows on the records of a table. * * An over-window defines for each record an interval of records over which aggregation @@ -1291,6 +1319,14 @@ class GroupedTable( } } +/** + * A table that has been windowed for grouping [[Window]]s. + * + * @deprecated Will be replaced by [[GroupWindowedTable]]. + */ +@Deprecated +@deprecated( + "This class will be replaced by GroupWindowedTable.", "1.8") class WindowedTable( private[flink] val table: Table, private[flink] val window: Window) { @@ -1343,6 +1379,15 @@ class WindowedTable( } +/** + * A table that has been windowed for [[GroupWindow]]s. + */ +class GroupWindowedTable( + override private[flink] val table: Table, + override private[flink] val window: GroupWindow) + extends WindowedTable(table, window) + + class OverWindowedTable( private[flink] val table: Table, private[flink] val overWindows: Array[OverWindow]) { @@ -1379,11 +1424,21 @@ class OverWindowedTable( } } -class WindowGroupedTable( +/** + * A table that has been windowed and grouped for [[GroupWindow]]s. + * + * @deprecated The constructor contains [[Window]] parameter will be removed. Use constructor + * with [[GroupWindow]] instead. + */ +class WindowGroupedTable @Deprecated() ( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], private[flink] val window: Window) { + def this(table: Table, groupKeys: Seq[Expression], window: GroupWindow) { + this(table, groupKeys, window.asInstanceOf[Window]) + } + /** * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. * The field expressions can contain complex expressions and aggregations. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala index ee022b1..57f382c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala @@ -18,10 +18,11 @@ package org.apache.flink.table.api +import org.apache.flink.table.api.java.{PartitionedOver => JPartitionedOver, OverWindowWithOrderBy => JOverWindowWithOrderBy} import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} -import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW} +import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW, OverWindowWithOrderBy, PartitionedOver} /** * Over window is similar to the traditional OVER SQL. @@ -134,9 +135,13 @@ class OverWindowWithPreceding( * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping * is required to apply aggregations on streaming tables. * - * For finite batch tables, window provides shortcuts for time-based groupBy. + * For finite batch tables, group windows provide shortcuts for time-based groupBy. * + * @deprecated Will be replaced by [[GroupWindow]] */ +@Deprecated +@deprecated( + "This class will be replaced by GroupWindow.", "1.8") abstract class Window(val alias: Expression, val timeField: Expression) { /** @@ -146,6 +151,21 @@ abstract class Window(val alias: Expression, val timeField: Expression) { } +/** + * A group window specification. + * + * Group windows group rows based on time or row-count intervals and is therefore essentially a + * special type of groupBy. Just like groupBy, group windows allow to compute aggregates + * on groups of elements. + * + * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping + * is required to apply aggregations on streaming tables. + * + * For finite batch tables, group windows provide shortcuts for time-based groupBy. + */ +abstract class GroupWindow(alias: Expression, timeField: Expression) + extends Window(alias, timeField) + // ------------------------------------------------------------------------------------------------ // Tumbling windows // ------------------------------------------------------------------------------------------------ @@ -234,7 +254,7 @@ class TumbleWithSizeOnTimeWithAlias( alias: Expression, timeField: Expression, size: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -369,7 +389,7 @@ class SlideWithSizeAndSlideOnTimeWithAlias( timeField: Expression, size: Expression, slide: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -469,7 +489,7 @@ class SessionWithGapOnTimeWithAlias( alias: Expression, timeField: Expression, gap: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -480,3 +500,190 @@ class SessionWithGapOnTimeWithAlias( SessionGroupWindow(alias, timeField, gap) } } + +/** + * Base class for Tumble Window Helper classes. This class contains help methods to create Tumble + * Windows. + */ +class TumbleBase { + + /** + * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a partially defined tumbling window + */ + def over(size: String): TumbleWithSize = new TumbleWithSize(size) + + /** + * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping + * windows. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a partially defined tumbling window + */ + def over(size: Expression): TumbleWithSize = new TumbleWithSize(size) +} + +/** + * Base class for Slide Window Helper classes. This class contains help methods to create Slide + * Windows. + */ +class SlideBase { + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive window evaluations. + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: String): SlideWithSize = new SlideWithSize(size) + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: Expression): SlideWithSize = new SlideWithSize(size) +} + +/** + * Base class for Session Window Helper classes. This class contains help methods to create Session + * Windows. + */ +class SessionBase { + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a partially defined session window + */ + def withGap(gap: String): SessionWithGap = new SessionWithGap(gap) + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a partially defined session window + */ + def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap) +} + +/** + * Base class for Over Window Helper classes. This class contains help methods to create Over + * Windows. + */ +class OverBase { + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: String): JOverWindowWithOrderBy = { + val orderByExpr = ExpressionParser.parseExpression(orderBy) + new JOverWindowWithOrderBy(Array[Expression](), orderByExpr) + } + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: Expression): OverWindowWithOrderBy = { + OverWindowWithOrderBy(Seq[Expression](), orderBy) + } + + /** + * Partitions the elements on some partition keys. + * + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. + * + * @param partitionBy list of field references + * @return an over window with defined partitioning + */ + def partitionBy(partitionBy: String): JPartitionedOver = { + val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray + new JPartitionedOver(partitionByExpr) + } + + /** + * Partitions the elements on some partition keys. + * + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. + * + * @param partitionBy list of field references + * @return an over window with defined partitioning + */ + def partitionBy(partitionBy: Expression*): PartitionedOver = { + PartitionedOver(partitionBy.toArray) + } +} + +/** + * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + */ +object Tumble extends TumbleBase + +/** + * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements + * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive + * window evaluations. + */ +object Slide extends SlideBase + +/** + * Helper class for creating a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + */ +object Session extends SessionBase + +/** + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. + */ +object Over extends OverBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala index 27c1d7f..46f35bc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.batch.table import java.sql.Timestamp import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.WindowReference diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala index 7cf1b82..15d7058 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.batch.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala index df8f7df..b19a523 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.batch.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0 import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala index afa9f8b..749120d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala index 5298ea4..84e685e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.Tumble import org.apache.flink.table.expressions.WindowReference import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala index a59ad83..ae533ee 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.WindowReference diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala index eeb7d5f..7603a8c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala @@ -21,7 +21,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.Func1 -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Over, Table} import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala index 5e3a410..c6ba354 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.api.{Over, TableSchema, Tumble, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{TableTestBase, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSourceWithTime} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala index 0833c24..63c7866 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Tumble import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.java.{Tumble => JTumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} import org.apache.flink.table.utils.TableTestBase @@ -143,7 +143,7 @@ class AggregateStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("50.milli").on("proctime").as("w1")) + .window(Tumble.over("50.milli").on("proctime").as("w1")) .groupBy("w1, string") .select("w1.proctime as proctime, w1.start as start, w1.end as end, string, int.count") @@ -163,7 +163,7 @@ class AggregateStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("50.milli").on("rowtime").as("w1")) + .window(Tumble.over("50.milli").on("rowtime").as("w1")) .groupBy("w1, string") .select("w1.rowtime as rowtime, string, int.count") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index 2cb5a8a..2e05139 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -19,9 +19,9 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg -import org.apache.flink.table.api.java.{Session => JSession, Slide => JSlide, Tumble => JTumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -53,7 +53,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w")) + .window(Slide.over("4.hours").every("2.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -92,7 +92,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("4.hours").on("rowtime").as("w")) + .window(Tumble.over("4.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -130,7 +130,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSession.withGap("4.hours").on("rowtime").as("w")) + .window(Session.withGap("4.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -167,7 +167,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w")) + .window(Slide.over("4.hours").every("2.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + @@ -204,7 +204,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("4.hours").on("proctime").as("w")) + .window(Tumble.over("4.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + @@ -241,7 +241,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSession.withGap("4.hours").on("proctime").as("w")) + .window(Session.withGap("4.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala index 99114b8..f3b312b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala @@ -19,10 +19,10 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Over import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg -import org.apache.flink.table.api.java.{Over => JOver} -import org.apache.flink.table.api.scala.{Over => SOver, _} +import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.Func1 import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -38,10 +38,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) .select('a, 'b.sum over 'w as 'cnt, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) .select("a, SUM(b) OVER w as cnt, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -56,10 +56,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w")) + .window(Over.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -74,10 +74,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w) + .window(Over partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w")) + .window(Over.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -92,10 +92,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w) + .window(Over orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("10.rows").following("current_row").as("w")) + .window(Over.orderBy("rowtime").preceding("10.rows").following("current_row").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -110,10 +110,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -128,15 +128,15 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) + .window(Over orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t .window( - JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w")) + Over.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") val resJava2 = t .window( - JOver.orderBy("rowtime").as("w")) + Over.orderBy("rowtime").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -152,15 +152,15 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) + .window(Over orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t .window( - JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w")) + Over.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") val resJava2 = t .window( - JOver.orderBy("proctime").as("w")) + Over.orderBy("proctime").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -176,10 +176,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w) + .window(Over partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w")) + .window(Over.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -194,10 +194,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w) + .window(Over orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("4.hours").following("current_range").as("w")) + .window(Over.orderBy("rowtime").preceding("4.hours").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -214,7 +214,7 @@ class OverWindowStringExpressionTest extends TableTestBase { util.addFunction("weightedAvg", weightedAvg) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) .select( array('a.sum over 'w, 'a.count over 'w), plusOne('b.sum over 'w as 'wsum) as 'd, @@ -223,7 +223,7 @@ class OverWindowStringExpressionTest extends TableTestBase { "AVG:".toExpr + (weightedAvg('a, 'b) over 'w)) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) .select( s""" |ARRAY(SUM(a) OVER w, COUNT(a) OVER w), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala index 6e0e15e..03438c8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala @@ -20,8 +20,8 @@ package org.apache.flink.table.api.stream.table.validation import java.math.BigDecimal import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.api.scala.{Tumble, _} +import org.apache.flink.table.api.{Tumble, ValidationException} +import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala index 8d5943d..5a507a4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala index 37e1526..350ad06 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.api.{Over, Table, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithRetract} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index eb9d4b2..5b4f364 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan import org.apache.calcite.rel.RelNode import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.nodes.datastream._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.CountDistinct diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 095cd04..7c00354 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.plan import java.sql.Timestamp import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Tumble import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.{TimeIntervalUnit, WindowReference} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala index bd2a868..a57cabd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Table, Tumble} import org.apache.flink.table.plan.util.UpdatingPlanChecker import org.apache.flink.table.utils.StreamTableTestUtil import org.junit.Assert._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala index 0b6470f..80886bc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.batch.table import java.math.BigDecimal import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala index a0f6c53..b152dcb 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} import org.apache.flink.api.java.{DataSet, ExecutionEnvironment => JExecEnv} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, TableSchema, Types} +import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types} import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase} import org.apache.flink.table.sources.BatchTableSource diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 2fb1fcd..a81b3ed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.api.{TableSchema, Tumble, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index c7a71e4..d9b7b50 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.api.{Session, Slide, StreamQueryConfig, Tumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala index 926319f..6341620 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.api.{StreamQueryConfig, Tumble, Types} import org.apache.flink.table.expressions.Literal import org.apache.flink.table.expressions.utils.Func20 import org.apache.flink.table.functions.aggfunctions.CountAggFunction diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala index c0a95ab..bb1c813 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.Over import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithRetractAndReset, WeightedAvg} import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.table.api.scala._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 84d8d21..8080082 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.api.{TableException, Tumble, Types} import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} import org.apache.flink.table.sinks._ import org.apache.flink.table.utils.MemoryTableSourceSinkUtil diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 52df3d5..eb571d5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, TableSchema, Types} +import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types} import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.utils._