[
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551712#comment-15551712
]
ASF GitHub Bot commented on FLINK-4691:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r82170078
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression,
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+ * Group-window specification. Group-windows allow aggregates which are
computed for a group of
+ * elements. A (time or row-count) window is required to bound the
infinite input stream into a
+ * finite group. Group-windows are evaluated once per group.
+ */
+trait GroupWindow {
+
+ /**
+ * Converts an API class to a logical window for planning. This is an
internal method.
+ */
+ private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+ * A group-window operating on event-time.
+ *
+ * @param timeField defines the time mode for streaming tables and acts
as a time attribute for
+ * batch tables over which the query is evaluated.
+ */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+ protected var name: Option[Expression] = None
+
+ /**
+ * Assigns an alias for this window that the following `select()`
clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): EventTimeWindow = {
+ this.name = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()`
clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): EventTimeWindow =
as(ExpressionParser.parseExpression(alias))
+}
+
+//
------------------------------------------------------------------------------------------------
+// Tumbling group-windows
+//
------------------------------------------------------------------------------------------------
+
+/**
+ * Tumbling group-window. By default, it works on processing-time.
+ *
+ * @param size size of the window either as number of rows or interval of
milliseconds
+ */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+ /**
+ * Tumbling group-window. By default, it works on processing-time.
+ *
+ * @param size size of the window either as number of rows or interval
of milliseconds
+ */
+ def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+ private var alias: Option[Expression] = None
+
+ /**
+ * Defines the time mode for streaming tables and specifies a time
attribute for
+ * batch tables over which the query is evaluated.
+ *
+ * @param timeField time mode for streaming tables and time attribute
for batch tables
+ * @return a tumbling group-window on event-time
+ */
+ def on(timeField: Expression): TumblingEventTimeWindow =
+ new TumblingEventTimeWindow(alias, timeField, size)
+
+ /**
+ * Defines the time mode for streaming tables and specifies a time
attribute for
+ * batch tables over which the query is evaluated.
+ *
+ * @param timeField time mode for streaming tables and time attribute
for batch tables
+ * @return a tumbling group-window on event-time
+ */
+ def on(timeField: String): TumblingEventTimeWindow =
+ on(ExpressionParser.parseExpression(timeField))
+
+ /**
+ * Assigns an alias for this window that the following `select()`
clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: Expression): TumblingWindow = {
+ this.alias = Some(alias)
+ this
+ }
+
+ /**
+ * Assigns an alias for this window that the following `select()`
clause can refer to in order
+ * to access window properties such as window start or end time.
+ *
+ * @param alias alias for this window
+ * @return this window
+ */
+ def as(alias: String): TumblingWindow =
as(ExpressionParser.parseExpression(alias))
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ ProcessingTimeTumblingGroupWindow(alias, size)
+}
+
+/**
+ * Tumbling group-window on event-time.
+ */
+class TumblingEventTimeWindow(
+ alias: Option[Expression],
+ time: Expression,
+ size: Expression)
+ extends EventTimeWindow(time) {
+
+ override private[flink] def toLogicalWindow: LogicalWindow =
+ EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
+}
+
+//
------------------------------------------------------------------------------------------------
+// Sliding group windows
+//
------------------------------------------------------------------------------------------------
+
+/**
+ * Partially specified sliding window.
+ *
+ * @param size size of the window either as number of rows or interval of
milliseconds
+ */
+class SlideWithSize(size: Expression) {
+
+ /**
+ * Partially specified sliding window.
+ *
+ * @param size size of the window either as number of rows or interval
of milliseconds
+ */
+ def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+ /**
+ * Specifies the window's slide. This is the size of the overlap.
--- End diff --
I tried to be in sync with the online documentation. But I get add your
sentence as well. The more explanation the better.
> Add group-windows for streaming tables
> ---------------------------------------
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> Implementation of group-windows on streaming tables. This includes
> implementing the API of group-windows, the logical validation for
> group-windows, and the definition of the “rowtime” and “systemtime” keywords.
> Group-windows on batch tables won’t be initially supported and will throw an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)