[
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548960#comment-15548960
]
ASF GitHub Bot commented on FLINK-4691:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r81847736
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
---
@@ -675,11 +692,77 @@ class GroupedTable(
* Example:
*
* {{{
- * tab.groupBy("key").select("key, value.avg + " The average" as
average")
+ * tab.groupBy("key").select("key, value.avg + ' The average' as
average")
+ * }}}
+ */
+ def select(fields: String): Table = {
+ val fieldExprs = ExpressionParser.parseExpressionList(fields)
+ select(fieldExprs: _*)
+ }
+
+ /**
+ * Windows a table to divide a (potentially) infinite stream of records
into finite slices
+ * based on the timestamps of elements or other criteria. This division
is required when
+ * working with infinite data and performing transformations that
aggregate elements.
+ *
+ * @param groupWindow group-window specification required to bound the
infinite input stream
+ * into a finite group
+ * @return group-windowed table
+ */
+ def window(groupWindow: GroupWindow): GroupWindowedTable = {
+ if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+ throw new ValidationException(s"Windows on batch tables are
currently not supported.")
+ }
+ new GroupWindowedTable(table, groupKey, groupWindow)
+ }
+}
+
+class GroupWindowedTable(
+ private[flink] val table: Table,
+ private[flink] val groupKey: Seq[Expression],
+ private[flink] val window: GroupWindow) {
+
+ /**
+ * Performs a selection operation on a group-windowed table. Similar to
an SQL SELECT statement.
+ * The field expressions can contain complex expressions and
aggregations.
+ *
+ * Example:
+ *
+ * {{{
+ * groupWindowTable.select('key, 'window.start, 'value.avg + " The
average" as 'average)
+ * }}}
+ */
+ def select(fields: Expression*): Table = {
+ val projectionOnAggregates = fields.map(extractAggregations(_,
table.tableEnv))
+ val aggregations = projectionOnAggregates.flatMap(_._2)
+
+ val groupWindow = window.toLogicalWindow
+
+ val logical = if (aggregations.nonEmpty) {
+ Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+ WindowAggregate(groupKey, groupWindow, aggregations,
table.logicalPlan)
+ .validate(table.tableEnv))
+ } else {
+ Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+ WindowAggregate(groupKey, groupWindow, Nil,
table.logicalPlan).validate(table.tableEnv))
+ }
+
+ new Table(table.tableEnv, logical.validate(table.tableEnv))
--- End diff --
Wasn't `logical` validated before?
> Add group-windows for streaming tables
> ---------------------------------------
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> Implementation of group-windows on streaming tables. This includes
> implementing the API of group-windows, the logical validation for
> group-windows, and the definition of the “rowtime” and “systemtime” keywords.
> Group-windows on batch tables won’t be initially supported and will throw an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)