[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551525#comment-15551525 ]
ASF GitHub Bot commented on FLINK-4691: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82147669 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( + window: LogicalWindow, + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + rowRelDataType: RelDataType, + inputType: RelDataType, + grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { + s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " + } else { + "" + }}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + val config = tableEnv.getConfig + + val groupingKeys = grouping.indices.toArray + // add grouping fields, position keys in the input, and input type + val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan( + tableEnv, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) + + // get the output types + val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala + .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) + .toArray + + val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates) + val prepareOpName = s"prepare select: ($aggString)" + val mappedInput = inputDS + .map(aggregateResult._1) + .name(prepareOpName) + + val groupReduceFunction = aggregateResult._2 + val rowTypeInfo = new RowTypeInfo(fieldTypes) + + val result = { + // grouped / keyed aggregation + if (groupingKeys.length > 0) { + val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + + s"window: ($window), " + + s"select: ($aggString)" + val aggregateFunction = new AggregateWindowFunction(groupReduceFunction) + + val keyedStream = mappedInput.keyBy(groupingKeys: _*) + + val windowedStream = createKeyedWindowedStream(window, keyedStream) + .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + + windowedStream + .apply(aggregateFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Any]] + } + // global / non-keyed aggregation + else { + val aggOpName = s"window: ($window), select: ($aggString)" + val aggregateFunction = new AggregateAllWindowFunction(groupReduceFunction) + + val windowedStream = createNonKeyedWindowedStream(window, mappedInput) + .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + + windowedStream + .apply(aggregateFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Any]] + } + } + + // if the expected type is not a Row, inject a mapper to convert to the expected type + expectedType match { + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => + val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" + result.map(getConversionMapper( + config = config, + nullableInput = false, + inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], + expectedType = expectedType.get, + conversionOperatorName = "DataStreamAggregateConversion", + fieldNames = getRowType.getFieldNames.asScala + )) + .name(mapName) + case _ => result + } + } + +} + +object DataStreamAggregate { + + private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple]) + : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match { + + case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => + stream.window(TumblingProcessingTimeWindows.of(asTime(size))) + + case ProcessingTimeTumblingGroupWindow(_, size) => + stream.countWindow(asCount(size)) + + case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => + stream + .window(TumblingEventTimeWindows.of(asTime(size))) + + case EventTimeTumblingGroupWindow(_, _, size) => + stream.countWindow(asCount(size)) + + case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) => + stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide))) + + case ProcessingTimeSlidingGroupWindow(_, size, slide) => + stream.countWindow(asCount(size), asCount(slide)) + + case EventTimeSlidingGroupWindow(_, _, size, slide) + if isTimeInterval(size.resultType) => + stream + .window(SlidingEventTimeWindows.of(asTime(size), asTime(slide))) + + case EventTimeSlidingGroupWindow(_, _, size, slide) => + stream.countWindow(asCount(size), asCount(slide)) + + case ProcessingTimeSessionGroupWindow(_, gap: Expression) => + stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap))) + + case EventTimeSessionGroupWindow(_, _, gap) => + stream + .window(EventTimeSessionWindows.withGap(asTime(gap))) + } + + private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row]) + : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match { + + case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => + stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size))) + + case ProcessingTimeTumblingGroupWindow(_, size) => + stream.countWindowAll(asCount(size)) + + case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => + stream + .windowAll(TumblingEventTimeWindows.of(asTime(size))) + + case EventTimeTumblingGroupWindow(_, _, size) => + stream.countWindowAll(asCount(size)) --- End diff -- sort on event-time. > Add group-windows for streaming tables > --------------------------------------- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)