[FLINK-5047] [table] Add sliding group-windows for batch tables This closes #3364.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a57c5a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a57c5a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a57c5a Branch: refs/heads/master Commit: 31a57c5a89d6d22ccb629c2adfe4ffb87441e6dd Parents: bec818d Author: twalthr <twal...@apache.org> Authored: Wed Jan 18 16:56:02 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Wed Mar 8 17:01:27 2017 +0100 ---------------------------------------------------------------------- .../table/functions/AggregateFunction.scala | 8 +- .../nodes/dataset/DataSetWindowAggregate.scala | 117 ++++++++- .../table/runtime/aggregate/AggregateUtil.scala | 236 ++++++++++++++++--- ...SetSessionWindowAggReduceGroupFunction.scala | 201 ++++++++++++++++ ...sionWindowAggregateReduceGroupFunction.scala | 201 ---------------- ...taSetSlideTimeWindowAggFlatMapFunction.scala | 63 +++++ ...tSlideTimeWindowAggReduceGroupFunction.scala | 202 ++++++++++++++++ ...SetSlideWindowAggReduceCombineFunction.scala | 117 +++++++++ ...taSetSlideWindowAggReduceGroupFunction.scala | 141 +++++++++++ ...umbleCountWindowAggReduceGroupFunction.scala | 3 - ...TumbleTimeWindowAggReduceGroupFunction.scala | 3 +- .../aggregate/DataSetWindowAggMapFunction.scala | 112 +++++++++ .../DataSetWindowAggregateMapFunction.scala | 111 --------- .../IncrementalAggregateAllWindowFunction.scala | 7 +- .../scala/stream/table/AggregationsITCase.scala | 43 +--- .../dataset/DataSetWindowAggregateITCase.scala | 163 ++++++++++++- .../datastream/DataStreamAggregateITCase.scala | 235 ++++++++++++++++++ 17 files changed, 1566 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala index 967d2ea..773c71f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala @@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def getValue(accumulator: Accumulator): T /** - * Process the input values and update the provided accumulator instance. + * Processes the input values and update the provided accumulator instance. * * @param accumulator the accumulator which contains the current * aggregated results @@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { def accumulate(accumulator: Accumulator, input: Any): Unit /** - * Merge a list of accumulator instances into one accumulator instance. + * Merges a list of accumulator instances into one accumulator instance. * - * IMPORTANT: You may only return a new accumulator instance or the the first accumulator of the + * IMPORTANT: You may only return a new accumulator instance or the first accumulator of the * input list. If you return another instance, the result of the aggregation function might be * incorrect. * @@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends UserDefinedFunction { * * @return The type information for the accumulator. */ - def getAccumulatorType(): TypeInformation[_] = null + def getAccumulatorType: TypeInformation[_] = null } /** http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index fb5ff3b..a94deb1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -111,17 +111,25 @@ class DataSetWindowAggregate( // whether identifiers are matched case-sensitively val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive() + window match { case EventTimeTumblingGroupWindow(_, _, size) => createEventTimeTumblingWindowDataSet( inputDS, isTimeInterval(size.resultType), caseSensitive) + case EventTimeSessionGroupWindow(_, _, gap) => createEventTimeSessionWindowDataSet(inputDS, caseSensitive) - case EventTimeSlidingGroupWindow(_, _, _, _) => - throw new UnsupportedOperationException( - "Event-time sliding windows in a batch environment are currently not supported") + + case EventTimeSlidingGroupWindow(_, _, size, slide) => + createEventTimeSlidingWindowDataSet( + inputDS, + isTimeInterval(size.resultType), + asLong(size), + asLong(slide), + caseSensitive) + case _: ProcessingTimeGroupWindow => throw new UnsupportedOperationException( "Processing-time tumbling windows are not supported in a batch environment, " + @@ -130,7 +138,6 @@ class DataSetWindowAggregate( } } - private def createEventTimeTumblingWindowDataSet( inputDS: DataSet[Row], isTimeWindow: Boolean, @@ -312,6 +319,108 @@ class DataSetWindowAggregate( } } + private def createEventTimeSlidingWindowDataSet( + inputDS: DataSet[Row], + isTimeWindow: Boolean, + size: Long, + slide: Long, + isParserCaseSensitive: Boolean) + : DataSet[Row] = { + + // create MapFunction for initializing the aggregations + // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) + + val mappedDataSet = inputDS + .map(mapFunction) + .name(prepareOperatorName) + + val mapReturnType = mappedDataSet.getType + + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val groupingKeys = grouping.indices.toArray + + // do partial aggregation if possible + val isPartial = doAllSupportPartialMerge( + namedAggregates.map(_.getKey), + inputType, + grouping.length) + + // only pre-tumble if it is worth it + val isLittleTumblingSize = determineLargestTumblingSize(size, slide) <= 1 + + val preparedDataSet = if (isTimeWindow) { + // time window + + if (isPartial && !isLittleTumblingSize) { + // partial aggregates + + val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1 + + // create GroupReduceFunction + // for pre-tumbling and replicating/omitting the content for each pane + val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) + + mappedDataSet.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeysAndAlignedRowtime: _*) + .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits + .name(prepareOperatorName) + } else { + // non-partial aggregates + + // create FlatMapFunction + // for replicating/omitting the content for each pane + val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction( + window, + namedAggregates, + grouping, + mapReturnType, + isParserCaseSensitive) + + mappedDataSet + .flatMap(prepareFlatMapFunction) // replicates/omits + } + } else { + // count window + + throw new UnsupportedOperationException( + "Count sliding group windows on event-time are currently not supported.") + } + + val prepareReduceReturnType = preparedDataSet.getType + + // create GroupReduceFunction for final aggregation and conversion to output row + val aggregateReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties, + isInputCombined = false) + + // gets the window-start position in the intermediate result. + val windowStartPos = prepareReduceReturnType.getArity - 1 + + val groupingKeysAndWindowStart = groupingKeys :+ windowStartPos + + preparedDataSet + .groupBy(groupingKeysAndWindowStart: _*) + .reduceGroup(aggregateReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + } + private def prepareOperatorName: String = { val aggString = aggregationToString( inputType, http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index acb6cd0..4900b1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -21,26 +21,26 @@ import java.util import org.apache.calcite.rel.`type`._ import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun._ -import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} +import org.apache.calcite.sql.{SqlAggFunction, SqlKind} +import org.apache.flink.api.common.functions.{FlatMapFunction, GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.expressions._ -import org.apache.flink.table.plan.logical._ -import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ -import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction} +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} import org.apache.flink.types.Row @@ -160,23 +160,37 @@ object AggregateUtil { groupings, aggregates, inputType, - Some(Array(Types.LONG))) + Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) val (timeFieldPos, tumbleTimeWindowSize) = window match { + case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) => + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, Some(asLong(size))) + case EventTimeTumblingGroupWindow(_, time, size) => val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - size match { - case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => - (timeFieldPos, Some(value)) - case _ => (timeFieldPos, None) - } + (timeFieldPos, None) + case EventTimeSessionGroupWindow(_, time, _) => - (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None) + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, None) + + case EventTimeSlidingGroupWindow(_, time, size, slide) + if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) => + // pre-tumble incremental aggregates on time-windows + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide)) + (timeFieldPos, Some(preTumblingSize)) + + case EventTimeSlidingGroupWindow(_, time, _, _) => + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + (timeFieldPos, None) + case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } - new DataSetWindowAggregateMapFunction( + new DataSetWindowAggMapFunction( aggregates, aggFieldIndexes, groupings, @@ -186,6 +200,116 @@ object AggregateUtil { } /** + * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for + * partial aggregates of sliding windows (time and count-windows). + * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for + * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the + * window-start, and replicates or omits records for different panes of a sliding window. + * + * The output of the function contains the grouping keys, the intermediate aggregate values of + * all aggregate function and the aligned window start. Window start must not be a timestamp, + * but can also be a count value for count-windows. + * + * The output is stored in Row by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | + * v v + * +---------+---------+--------+--------+--------+--------+-------------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart | + * +---------+---------+--------+--------+--------+--------+-------------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 window start for pane mapping + * }}} + * + * NOTE: this function is only used for sliding windows with partial aggregates on batch tables. + */ + def createDataSetSlideWindowPrepareGroupReduceFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType, + isParserCaseSensitive: Boolean) + : RichGroupReduceFunction[Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + needRetraction = false)._2 + + val returnType: RowTypeInfo = createDataSetAggregateBufferDataType( + groupings, + aggregates, + inputType, + Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) + + window match { + case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + // sliding time-window for partial aggregations + new DataSetSlideTimeWindowAggReduceGroupFunction( + aggregates, + groupings.length, + returnType.getArity - 1, + asLong(size), + asLong(slide), + returnType) + + case _ => + throw new UnsupportedOperationException(s"$window is currently not supported on batch.") + } + } + + /** + * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for + * non-incremental aggregates of sliding windows (time-windows). + * + * It requires a prepared input (with intermediate aggregate fields), aligns the + * window-start, and replicates or omits records for different panes of a sliding window. + * + * The output of the function contains the grouping keys, the intermediate aggregate values of + * all aggregate function and the aligned window start. + * + * The output is stored in Row by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | + * v v + * +---------+---------+--------+--------+--------+--------+-------------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 | windowStart | + * +---------+---------+--------+--------+--------+--------+-------------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 window start for pane mapping + * }}} + * + * NOTE: this function is only used for time-based sliding windows on batch tables. + */ + def createDataSetSlideWindowPrepareFlatMapFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: TypeInformation[Row], + isParserCaseSensitive: Boolean) + : FlatMapFunction[Row, Row] = { + + window match { + case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + new DataSetSlideTimeWindowAggFlatMapFunction( + inputType.getArity - 1, + asLong(size), + asLong(slide), + inputType) + + case _ => + throw new UnsupportedOperationException( + s"$window is currently not supported in a batch environment.") + } + } + + /** * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute window * aggregates on batch tables. If all aggregates support partial aggregation and is a time * window, the [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements @@ -203,10 +327,10 @@ object AggregateUtil { isInputCombined: Boolean = false) : RichGroupReduceFunction[Row, Row] = { - val aggregates = transformToAggregateFunctions( + val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), inputType, - needRetraction = false)._2 + needRetraction = false) // the mapping relation between field index of intermediate aggregate Row and output Row. val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings) @@ -259,7 +383,7 @@ object AggregateUtil { case EventTimeSessionGroupWindow(_, _, gap) => val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) - new DataSetSessionWindowAggregateReduceGroupFunction( + new DataSetSessionWindowAggReduceGroupFunction( aggregates, groupingOffsetMapping, aggOffsetMapping, @@ -268,6 +392,42 @@ object AggregateUtil { endPos, asLong(gap), isInputCombined) + + case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + if (doAllSupportPartialMerge(aggregates)) { + // for partial aggregations + new DataSetSlideWindowAggReduceCombineFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + startPos, + endPos, + asLong(size)) + } + else { + // for non-partial aggregations + new DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + startPos, + endPos, + asLong(size)) + } + + case EventTimeSlidingGroupWindow(_, _, size, _) => + new DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + outputType.getFieldCount, + None, + None, + asLong(size)) + case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } @@ -355,6 +515,7 @@ object AggregateUtil { needRetraction = false)._2 window match { + case EventTimeSessionGroupWindow(_, _, gap) => val combineReturnType: RowTypeInfo = createDataSetAggregateBufferDataType( @@ -368,6 +529,7 @@ object AggregateUtil { groupings, asLong(gap), combineReturnType) + case _ => throw new UnsupportedOperationException( s" [ ${window.getClass.getCanonicalName.split("\\.").last} ] is currently not " + @@ -662,7 +824,8 @@ object AggregateUtil { } val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName aggregateCall.getAggregation match { - case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => { + + case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => if (needRetraction) { aggregates(index) = sqlTypeName match { case TINYINT => @@ -702,8 +865,8 @@ object AggregateUtil { throw new TableException("Sum aggregate does no support type:" + sqlType) } } - } - case _: SqlAvgAggFunction => { + + case _: SqlAvgAggFunction => aggregates(index) = sqlTypeName match { case TINYINT => new ByteAvgAggFunction @@ -722,8 +885,8 @@ object AggregateUtil { case sqlType: SqlTypeName => throw new TableException("Avg aggregate does no support type:" + sqlType) } - } - case sqlMinMaxFunction: SqlMinMaxAggFunction => { + + case sqlMinMaxFunction: SqlMinMaxAggFunction => aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) { if (needRetraction) { sqlTypeName match { @@ -815,9 +978,10 @@ object AggregateUtil { } } } - } + case _: SqlCountAggFunction => aggregates(index) = new CountAggFunction + case unSupported: SqlAggFunction => throw new TableException("unsupported Function: " + unSupported.getName) } @@ -833,7 +997,7 @@ object AggregateUtil { val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg => - val accType = agg.getAccumulatorType() + val accType = agg.getAccumulatorType if (accType != null) { accType } else { @@ -969,10 +1133,22 @@ object AggregateUtil { } } - private def asLong(expr: Expression): Long = expr match { + private[flink] def asLong(expr: Expression): Long = expr match { case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value case _ => throw new IllegalArgumentException() } + + private[flink] def determineLargestTumblingSize(size: Long, slide: Long): Long = { + if (slide > size) { + gcd(slide, size) + } else { + gcd(size, slide) + } + } + + private def gcd(a: Long, b: Long): Long = { + if (b == 0) a else gcd(b, a % b) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala new file mode 100644 index 0000000..1f19687 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.util.{Collector, Preconditions} + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window + * on batch. + * + * Note: + * + * This can handle two input types (depending if input is combined or not): + * + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and + * aggregated value index in output Row. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggReduceGroupFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupKeysMapping: Array[(Int, Int)], + aggregateMapping: Array[(Int, Int)], + finalRowArity: Int, + finalRowWindowStartPos: Option[Int], + finalRowWindowEndPos: Option[Int], + gap: Long, + isInputCombined: Boolean) + extends RichGroupReduceFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private val accumStartPos: Int = groupKeysMapping.length + private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2 + private val intermediateRowWindowStartPos = intermediateRowArity - 2 + private val intermediateRowWindowEndPos = intermediateRowArity - 1 + + val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + + // init lists with two empty accumulators + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + } + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + + var windowStart: java.lang.Long = null + var windowEnd: java.lang.Long = null + var currentRowTime: java.lang.Long = null + + // reset first accumulator in merge list + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + } + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + // initial traversal or opening a new window + if (null == windowEnd || + (null != windowEnd && currentRowTime > windowEnd)) { + + // calculate the current window and open a new window + if (null != windowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) + + // reset first accumulator in list + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + } + } else { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, record.getField(previous)) + } + } + + windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + } + + for (i <- aggregates.indices) { + // insert received accumulator into acc list + val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + } + + windowEnd = if (isInputCombined) { + // partial aggregate is supported + record.getField(intermediateRowWindowEndPos).asInstanceOf[Long] + } else { + // partial aggregate is not supported, window-start equal rowtime + gap + currentRowTime + gap + } + } + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) + } + + /** + * Evaluate and emit the data of the current window. + * + * @param out the collection of the aggregate results + * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for + * each aggregate + * @param windowStart the window's start attribute value is the min (rowtime) of all rows + * in the window. + * @param windowEnd the window's end property value is max (rowtime) + gap for all rows + * in the window. + */ + def doEvaluateAndCollect( + out: Collector[Row], + accumulatorList: Array[JArrayList[Accumulator]], + windowStart: Long, + windowEnd: Long): Unit = { + + // merge the accumulators and then get value for the final output + aggregateMapping.foreach { + case (after, previous) => + val agg = aggregates(previous) + output.setField(after, agg.getValue(accumulatorList(previous).get(0))) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + collector.wrappedCollector = out + collector.windowStart = windowStart + collector.windowEnd = windowEnd + + collector.collect(output) + } else { + out.collect(output) + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala deleted file mode 100644 index ebef211..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.lang.Iterable -import java.util.{ArrayList => JArrayList} - -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.types.Row -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} -import org.apache.flink.util.{Collector, Preconditions} - -/** - * It wraps the aggregate logic inside of - * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window - * on batch. - * - * Note: - * - * This can handle two input types (depending if input is combined or not): - * - * 1. when partial aggregate is not supported, the input data structure of reduce is - * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| - * 2. when partial aggregate is supported, the input data structure of reduce is - * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| - * - * @param aggregates The aggregate functions. - * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping The index mapping between aggregate function list and - * aggregated value index in output Row. - * @param finalRowArity The output row field count. - * @param finalRowWindowStartPos The relative window-start field position. - * @param finalRowWindowEndPos The relative window-end field position. - * @param gap Session time window gap. - */ -class DataSetSessionWindowAggregateReduceGroupFunction( - aggregates: Array[AggregateFunction[_ <: Any]], - groupKeysMapping: Array[(Int, Int)], - aggregateMapping: Array[(Int, Int)], - finalRowArity: Int, - finalRowWindowStartPos: Option[Int], - finalRowWindowEndPos: Option[Int], - gap: Long, - isInputCombined: Boolean) - extends RichGroupReduceFunction[Row, Row] { - - private var aggregateBuffer: Row = _ - private var output: Row = _ - private var collector: TimeWindowPropertyCollector = _ - private val accumStartPos: Int = groupKeysMapping.length - private val intermediateRowArity: Int = accumStartPos + aggregates.length + 2 - private val intermediateRowWindowStartPos = intermediateRowArity - 2 - private val intermediateRowWindowEndPos = intermediateRowArity - 1 - - val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - } - - override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) - aggregateBuffer = new Row(intermediateRowArity) - output = new Row(finalRowArity) - collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) - - // init lists with two empty accumulators - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - } - } - - /** - * For grouped intermediate aggregate Rows, divide window according to the window-start - * and window-end, merge data (within a unified window) into an aggregate buffer, calculate - * aggregated values output from aggregate buffer, and then set them into output - * Row based on the mapping relationship between intermediate aggregate data and output data. - * - * @param records Grouped intermediate aggregate Rows iterator. - * @param out The collector to hand results to. - * - */ - override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { - - var windowStart: java.lang.Long = null - var windowEnd: java.lang.Long = null - var currentRowTime: java.lang.Long = null - - // reset first accumulator in merge list - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - } - - val iterator = records.iterator() - - while (iterator.hasNext) { - val record = iterator.next() - currentRowTime = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long] - // initial traversal or opening a new window - if (null == windowEnd || - (null != windowEnd && currentRowTime > windowEnd)) { - - // calculate the current window and open a new window - if (null != windowEnd) { - // evaluate and emit the current window's result. - doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) - - // reset first accumulator in list - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - } - } else { - // set group keys value to final output. - groupKeysMapping.foreach { - case (after, previous) => - output.setField(after, record.getField(previous)) - } - } - - windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long] - } - - for (i <- aggregates.indices) { - // insert received accumulator into acc list - val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - } - - windowEnd = if (isInputCombined) { - // partial aggregate is supported - record.getField(intermediateRowWindowEndPos).asInstanceOf[Long] - } else { - // partial aggregate is not supported, window-start equal rowtime + gap - currentRowTime + gap - } - } - // evaluate and emit the current window's result. - doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd) - } - - /** - * Evaluate and emit the data of the current window. - * - * @param out the collection of the aggregate results - * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for - * each aggregate - * @param windowStart the window's start attribute value is the min (rowtime) of all rows - * in the window. - * @param windowEnd the window's end property value is max (rowtime) + gap for all rows - * in the window. - */ - def doEvaluateAndCollect( - out: Collector[Row], - accumulatorList: Array[JArrayList[Accumulator]], - windowStart: Long, - windowEnd: Long): Unit = { - - // merge the accumulators and then get value for the final output - aggregateMapping.foreach { - case (after, previous) => - val agg = aggregates(previous) - output.setField(after, agg.getValue(accumulatorList(previous).get(0))) - } - - // adds TimeWindow properties to output then emit output - if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { - collector.wrappedCollector = out - collector.windowStart = windowStart - collector.windowEnd = windowEnd - - collector.collect(output) - } else { - out.collect(output) - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala new file mode 100644 index 0000000..5f37b8a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + + +/** + * It is used for sliding windows on batch for time-windows. It takes a prepared input row, + * aligns the window start, and replicates or omits records for different panes of a sliding + * window. It is used for non-partial aggregations. + * + * @param windowSize window size of the sliding window + * @param windowSlide window slide of the sliding window + * @param returnType return type of this function + */ +class DataSetSlideTimeWindowAggFlatMapFunction( + private val timeFieldPos: Int, + private val windowSize: Long, + private val windowSlide: Long, + @transient private val returnType: TypeInformation[Row]) + extends RichFlatMapFunction[Row, Row] + with ResultTypeQueryable[Row] { + + override def flatMap(record: Row, out: Collector[Row]): Unit = { + val windowStart = record.getField(timeFieldPos).asInstanceOf[Long] + + // adopted from SlidingEventTimeWindows.assignWindows + var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide) + + // adopted from SlidingEventTimeWindows.assignWindows + while (start > windowStart - windowSize) { + record.setField(timeFieldPos, start) + out.collect(record) + start -= windowSlide + } + } + + override def getProducedType: TypeInformation[Row] = { + returnType + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala new file mode 100644 index 0000000..5db3acb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with + * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, aligns the window start, + * and replicates or omits records for different panes of a sliding window. + * + * This function is similar to [[DataSetTumbleCountWindowAggReduceGroupFunction]], however, + * it does no final aggregate evaluation. It also includes the logic of + * [[DataSetSlideTimeWindowAggFlatMapFunction]]. + * + * @param aggregates aggregate functions + * @param groupingKeysLength number of grouping keys + * @param timeFieldPos position of aligned time field + * @param windowSize window size of the sliding window + * @param windowSlide window slide of the sliding window + * @param returnType return type of this function + */ +class DataSetSlideTimeWindowAggReduceGroupFunction( + private val aggregates: Array[AggregateFunction[_ <: Any]], + private val groupingKeysLength: Int, + private val timeFieldPos: Int, + private val windowSize: Long, + private val windowSlide: Long, + @transient private val returnType: TypeInformation[Row]) + extends RichGroupReduceFunction[Row, Row] + with CombineFunction[Row, Row] + with ResultTypeQueryable[Row] { + + Preconditions.checkNotNull(aggregates) + + protected var intermediateRow: Row = _ + // add one field to store window start + protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1 + protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + private val intermediateWindowStartPos: Int = intermediateRowArity - 1 + + override def open(config: Configuration) { + intermediateRow = new Row(intermediateRowArity) + + // init lists with two empty accumulators + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + i += 1 + } + } + + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // trigger tumbling evaluation + if (!iterator.hasNext) { + val windowStart = record.getField(timeFieldPos).asInstanceOf[Long] + + // adopted from SlidingEventTimeWindows.assignWindows + var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide) + + // skip preparing output if it is not necessary + if (start > windowStart - windowSize) { + + // set group keys + i = 0 + while (i < groupingKeysLength) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set accumulators + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0)) + i += 1 + } + + // adopted from SlidingEventTimeWindows.assignWindows + while (start > windowStart - windowSize) { + intermediateRow.setField(intermediateWindowStartPos, start) + out.collect(intermediateRow) + start -= windowSlide + } + } + } + } + } + + override def combine(records: Iterable[Row]): Row = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + + // set group keys + i = 0 + while (i < groupingKeysLength) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set accumulators + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupingKeysLength + i, accumulatorList(i).get(0)) + i += 1 + } + + intermediateRow.setField(timeFieldPos, record.getField(timeFieldPos)) + + return intermediateRow + } + } + + // this code path should never be reached as we return before the loop finishes + // we need this to prevent a compiler error + throw new IllegalArgumentException("Group is empty. This should never happen.") + } + + override def getProducedType: TypeInformation[Row] = { + returnType + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala new file mode 100644 index 0000000..c11e86b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.CombineFunction +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row + +/** + * Wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]. + * + * It is used for sliding on batch for both time and count-windows. + * + * @param aggregates aggregate functions. + * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping index mapping between aggregate function list and aggregated value + * index in output Row. + * @param finalRowArity output row field count + * @param finalRowWindowStartPos relative window-start position to last field of output row + * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param windowSize size of the window, used to determine window-end for output row + */ +class DataSetSlideWindowAggReduceCombineFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupKeysMapping: Array[(Int, Int)], + aggregateMapping: Array[(Int, Int)], + finalRowArity: Int, + finalRowWindowStartPos: Option[Int], + finalRowWindowEndPos: Option[Int], + windowSize: Long) + extends DataSetSlideWindowAggReduceGroupFunction( + aggregates, + groupKeysMapping, + aggregateMapping, + finalRowArity, + finalRowWindowStartPos, + finalRowWindowEndPos, + windowSize) + with CombineFunction[Row, Row] { + + private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 + private val intermediateRow: Row = new Row(intermediateRowArity) + + override def combine(records: Iterable[Row]): Row = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + // set group keys + i = 0 + while (i < groupKeysMapping.length) { + intermediateRow.setField(i, record.getField(i)) + i += 1 + } + + // set the partial accumulated result + i = 0 + while (i < aggregates.length) { + intermediateRow.setField(groupKeysMapping.length + i, accumulatorList(i).get(0)) + i += 1 + } + + intermediateRow.setField(windowStartPos, record.getField(windowStartPos)) + + return intermediateRow + } + } + + // this code path should never be reached as we return before the loop finishes + // we need this to prevent a compiler error + throw new IllegalArgumentException("Group is empty. This should never happen.") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala new file mode 100644 index 0000000..e67fac0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. + * + * It is used for sliding on batch for both time and count-windows. + * + * @param aggregates aggregate functions. + * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping index mapping between aggregate function list and aggregated value + * index in output Row. + * @param finalRowArity output row field count + * @param finalRowWindowStartPos relative window-start position to last field of output row + * @param finalRowWindowEndPos relative window-end position to last field of output row + * @param windowSize size of the window, used to determine window-end for output row + */ +class DataSetSlideWindowAggReduceGroupFunction( + aggregates: Array[AggregateFunction[_ <: Any]], + groupKeysMapping: Array[(Int, Int)], + aggregateMapping: Array[(Int, Int)], + finalRowArity: Int, + finalRowWindowStartPos: Option[Int], + finalRowWindowEndPos: Option[Int], + windowSize: Long) + extends RichGroupReduceFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + + private var collector: TimeWindowPropertyCollector = _ + private var output: Row = _ + private val accumulatorStartPos: Int = groupKeysMapping.length + protected val windowStartPos: Int = accumulatorStartPos + aggregates.length + + val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + + override def open(config: Configuration) { + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + + // init lists with two empty accumulators + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + i += 1 + } + } + + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + + // reset first accumulator + var i = 0 + while (i < aggregates.length) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + i += 1 + } + + val iterator = records.iterator() + while (iterator.hasNext) { + val record = iterator.next() + + // accumulate + i = 0 + while (i < aggregates.length) { + // insert received accumulator into acc list + val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + i += 1 + } + + // check if this record is the last record + if (!iterator.hasNext) { + // set group keys value to final output + i = 0 + while (i < groupKeysMapping.length) { + val mapping = groupKeysMapping(i) + output.setField(mapping._1, record.getField(mapping._2)) + i += 1 + } + + // get final aggregate value and set to output. + i = 0 + while (i < aggregateMapping.length) { + val mapping = aggregateMapping(i) + val agg = aggregates(i) + val result = agg.getValue(accumulatorList(mapping._2).get(0)) + output.setField(mapping._1, result) + i += 1 + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + collector.wrappedCollector = out + collector.windowStart = record.getField(windowStartPos).asInstanceOf[Long] + collector.windowEnd = collector.windowStart + windowSize + + collector.collect(output) + } else { + out.collect(output) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala index 85df1d8..ecc945c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala @@ -47,10 +47,8 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( private val finalRowArity: Int) extends RichGroupReduceFunction[Row, Row] { - private var aggregateBuffer: Row = _ private var output: Row = _ private val accumStartPos: Int = groupKeysMapping.length - private val intermediateRowArity: Int = accumStartPos + aggregates.length + 1 val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { new JArrayList[Accumulator](2) @@ -59,7 +57,6 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( override def open(config: Configuration) { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(groupKeysMapping) - aggregateBuffer = new Row(intermediateRowArity) output = new Row(finalRowArity) // init lists with two empty accumulators http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 7ce0bf1..674c078 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -113,11 +113,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( // get final aggregate value and set to output. aggregateMapping.foreach { - case (after, previous) => { + case (after, previous) => val agg = aggregates(previous) val result = agg.getValue(accumulatorList(previous).get(0)) output.setField(after, result) - } } // get window start timestamp http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala new file mode 100644 index 0000000..4a64c47 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.sql.Timestamp + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + + +/** + * This map function only works for windows on batch tables. The differences between this function + * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function + * append an (aligned) rowtime field to the end of the output row. + */ +class DataSetWindowAggMapFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val groupingKeys: Array[Int], + private val timeFieldPos: Int, // time field position in input row + private val tumbleTimeWindowSize: Option[Long], + @transient private val returnType: TypeInformation[Row]) + extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] { + + private var output: Row = _ + // rowtime index in the buffer output row + private var rowtimeIndex: Int = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + // add one more arity to store rowtime + val partialRowLength = groupingKeys.length + aggregates.length + 1 + // set rowtime to the last field of the output row + rowtimeIndex = partialRowLength - 1 + output = new Row(partialRowLength) + } + + override def map(input: Row): Row = { + + for (i <- aggregates.indices) { + val agg = aggregates(i) + val fieldValue = input.getField(aggFields(i)) + val accumulator = agg.createAccumulator() + agg.accumulate(accumulator, fieldValue) + output.setField(groupingKeys.length + i, accumulator) + } + + for (i <- groupingKeys.indices) { + output.setField(i, input.getField(groupingKeys(i))) + } + + val timeField = input.getField(timeFieldPos) + val rowtime = getTimestamp(timeField) + if (tumbleTimeWindowSize.isDefined) { + // in case of tumble time window, align rowtime to window start to represent the window + output.setField( + rowtimeIndex, + TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get)) + } else { + // for session window and slide window + output.setField(rowtimeIndex, rowtime) + } + + output + } + + private def getTimestamp(timeField: Any): Long = { + timeField match { + case b: Byte => b.toLong + case t: Character => t.toLong + case s: Short => s.toLong + case i: Int => i.toLong + case l: Long => l + case f: Float => f.toLong + case d: Double => d.toLong + case s: String => s.toLong + case t: Timestamp => SqlFunctions.toLong(t) + case _ => + throw new RuntimeException( + s"Window time field doesn't support ${timeField.getClass} type currently") + } + } + + override def getProducedType: TypeInformation[Row] = { + returnType + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala deleted file mode 100644 index 68088fc..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.sql.Timestamp - -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.table.functions.AggregateFunction -import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - - -/** - * This map function only works for windows on batch tables. The differences between this function - * and [[org.apache.flink.table.runtime.aggregate.AggregateMapFunction]] is this function - * append an (aligned) rowtime field to the end of the output row. - */ -class DataSetWindowAggregateMapFunction( - private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], - private val groupingKeys: Array[Int], - private val timeFieldPos: Int, // time field position in input row - private val tumbleTimeWindowSize: Option[Long], - @transient private val returnType: TypeInformation[Row]) - extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] { - - private var output: Row = _ - // rowtime index in the buffer output row - private var rowtimeIndex: Int = _ - - override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggFields) - Preconditions.checkArgument(aggregates.length == aggFields.length) - // add one more arity to store rowtime - val partialRowLength = groupingKeys.length + aggregates.length + 1 - // set rowtime to the last field of the output row - rowtimeIndex = partialRowLength - 1 - output = new Row(partialRowLength) - } - - override def map(input: Row): Row = { - - for (i <- aggregates.indices) { - val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)) - val accumulator = agg.createAccumulator() - agg.accumulate(accumulator, fieldValue) - output.setField(groupingKeys.length + i, accumulator) - } - - for (i <- groupingKeys.indices) { - output.setField(i, input.getField(groupingKeys(i))) - } - - val timeField = input.getField(timeFieldPos) - val rowtime = getTimestamp(timeField) - if (tumbleTimeWindowSize.isDefined) { - // in case of tumble time window, align rowtime to window start to represent the window - output.setField( - rowtimeIndex, - TimeWindow.getWindowStartWithOffset(rowtime, 0L, tumbleTimeWindowSize.get)) - } else { - // for session window and slide window - output.setField(rowtimeIndex, rowtime) - } - - output - } - - private def getTimestamp(timeField: Any): Long = { - timeField match { - case b: Byte => b.toLong - case t: Character => t.toLong - case s: Short => s.toLong - case i: Int => i.toLong - case l: Long => l - case f: Float => f.toLong - case d: Double => d.toLong - case s: String => s.toLong - case t: Timestamp => t.getTime - case _ => - throw new RuntimeException( - s"Window time field doesn't support ${timeField.getClass} type currently") - } - } - - override def getProducedType: TypeInformation[Row] = { - returnType - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala index 00aba1f..13ac6a9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala @@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window]( if (iterator.hasNext) { val record = iterator.next() - out.collect(record) + var i = 0 + while (i < record.getArity) { + output.setField(i, record.getField(i)) + i += 1 + } + out.collect(output) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala index 818cd0e..3e7b66b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala @@ -19,16 +19,16 @@ package org.apache.flink.table.api.scala.stream.table import org.apache.flink.api.scala._ -import org.apache.flink.types.Row -import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark -import org.apache.flink.table.api.scala.stream.utils.StreamITCase -import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test @@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } - - @Test - def testEventTimeSlidingWindow(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - - val stream = env - .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'string) - - val windowedTable = table - .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) - .groupBy('w, 'string) - .select('string, 'int.count, 'w.start, 'w.end, 'w.start) - - val results = windowedTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = Seq( - "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", - "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005", - "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01", - "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015", - "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", - "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", - "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } } -object GroupWindowITCase { +object AggregationsITCase { class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { override def checkAndGetNextWatermark(