[
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551476#comment-15551476
]
ASF GitHub Bot commented on FLINK-4691:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r82150960
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{StreamTableEnvironment,
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo,
TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+ name: Option[Expression],
+ time: Expression)
+ extends LogicalWindow(name) {
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult = {
+ val valid = super.validate(tableEnv)
+ if (valid.isFailure) {
+ return valid
+ }
+
+ if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+ val valid = time match {
+ case RowtimeAttribute() =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure("Event-time window expects a 'rowtime' time
field.")
+ }
+ if (valid.isFailure) {
+ return valid
+ }
+ }
+ if (!TypeCoercion.canCast(time.resultType,
BasicTypeInfo.LONG_TYPE_INFO)) {
+ ValidationFailure(s"Event-time window expects a time field that can
be safely cast " +
+ s"to Long, but is ${time.resultType}")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends
LogicalWindow(name)
+
+//
------------------------------------------------------------------------------------------------
+// Tumbling group windows
+//
------------------------------------------------------------------------------------------------
+
+object TumblingGroupWindow {
+ def validate(tableEnv: TableEnvironment, size: Expression):
ValidationResult = size match {
+ case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) | Literal(_,
BasicTypeInfo.INT_TYPE_INFO) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(
+ "Tumbling window expects size literal of type Interval of
Milliseconds or Long/Integer.")
+ }
+}
+
+case class ProcessingTimeTumblingGroupWindow(
+ name: Option[Expression],
+ size: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ ProcessingTimeTumblingGroupWindow(
+ name.map(resolve),
+ resolve(size))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv,
size))
+
+ override def toString: String =
s"ProcessingTimeTumblingGroupWindow($name, $size)"
+}
+
+case class EventTimeTumblingGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ size: Expression)
+ extends EventTimeGroupWindow(
+ name,
+ timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ EventTimeTumblingGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(size))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv,
size))
+
+ override def toString: String = s"EventTimeTumblingGroupWindow($name,
$timeField, $size)"
+}
+
+//
------------------------------------------------------------------------------------------------
+// Sliding group windows
+//
------------------------------------------------------------------------------------------------
+
+object SlidingGroupWindow {
+ def validate(
+ tableEnv: TableEnvironment,
+ size: Expression,
+ slide: Expression)
+ : ValidationResult = {
+
+ val checkedSize = size match {
+ case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, BasicTypeInfo.INT_TYPE_INFO |
BasicTypeInfo.LONG_TYPE_INFO) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(
+ "Sliding window expects size literal of type Interval of
Milliseconds or Long/Integer.")
+ }
+
+ val checkedSlide = slide match {
+ case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, BasicTypeInfo.INT_TYPE_INFO |
BasicTypeInfo.LONG_TYPE_INFO) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(
+ "Sliding window expects slide literal of type Interval of
Milliseconds or Long/Integer.")
+ }
+
+ checkedSize
+ .orElse(checkedSlide)
+ .orElse {
+ if (size.resultType != slide.resultType) {
+ ValidationFailure("Sliding window expects same type of size and
slide.")
+ } else {
+ ValidationSuccess
+ }
+ }
+ }
+}
+
+case class ProcessingTimeSlidingGroupWindow(
+ name: Option[Expression],
+ size: Expression,
+ slide: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ ProcessingTimeSlidingGroupWindow(
+ name.map(resolve),
+ resolve(size),
+ resolve(slide))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv,
size, slide))
+
+ override def toString: String =
s"ProcessingTimeSlidingGroupWindow($name, $size, $slide)"
+}
+
+case class EventTimeSlidingGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ size: Expression,
+ slide: Expression)
+ extends EventTimeGroupWindow(name, timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ EventTimeSlidingGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(size),
+ resolve(slide))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv,
size, slide))
+
+ override def toString: String = s"EventTimeSlidingGroupWindow($name,
$timeField, $size, $slide)"
+}
+
+//
------------------------------------------------------------------------------------------------
+// Session group windows
+//
------------------------------------------------------------------------------------------------
+
+object SessionGroupWindow {
+
+ def validate(tableEnv: TableEnvironment, gap: Expression):
ValidationResult = gap match {
+ case Literal(timeInterval: Long, IntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(
+ "Session window expects gap literal of type Interval of
Milliseconds.")
+ }
+}
+
+case class ProcessingTimeSessionGroupWindow(
+ name: Option[Expression],
+ gap: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ ProcessingTimeSessionGroupWindow(
+ name.map(resolve),
+ resolve(gap))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv,
gap))
+
+ override def toString: String =
s"ProcessingTimeSessionGroupWindow($name, $gap)"
+}
+
+case class EventTimeSessionGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ gap: Expression)
+ extends EventTimeGroupWindow(
+ name,
+ timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression):
LogicalWindow =
+ EventTimeSessionGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(gap))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv,
gap))
+
+ override def toString: String =
s"ProcessingTimeSessionGroupWindow($name, $timeField, $gap)"
--- End diff --
Done.
> Add group-windows for streaming tables
> ---------------------------------------
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> Implementation of group-windows on streaming tables. This includes
> implementing the API of group-windows, the logical validation for
> group-windows, and the definition of the “rowtime” and “systemtime” keywords.
> Group-windows on batch tables won’t be initially supported and will throw an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)