Repository: flink Updated Branches: refs/heads/master 5ff9c99ff -> c5173fa26
http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala new file mode 100644 index 0000000..525d4d7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.slf4j.LoggerFactory + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param genAggregations Generated aggregate helper function + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + */ +abstract class RowTimeUnboundedOver( + genAggregations: GeneratedAggregationsFunction, + intermediateType: TypeInformation[Row], + inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row] + with Compiler[GeneratedAggregations] { + + protected var output: Row = _ + // state to hold the accumulators of the aggregations + private var accumulatorState: ValueState[Row] = _ + // state to hold rows until the next watermark arrives + private var rowMapState: MapState[Long, JList[Row]] = _ + // list to sort timestamps to access rows in timestamp order + private var sortedTimestamps: util.LinkedList[Long] = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + protected var function: GeneratedAggregations = _ + + override def open(config: Configuration) { + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() + sortedTimestamps = new util.LinkedList[Long]() + + // initialize accumulator state + val accDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) + accumulatorState = getRuntimeContext.getState[Row](accDescriptor) + + // initialize row state + val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType) + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** + * Puts an element from the input stream into state if it is not late. + * Registers a timer for the next watermark. + * + * @param input The input value. + * @param ctx The ctx to register timer or get current time + * @param out The collector for returning result values. + * + */ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val timestamp = ctx.timestamp() + val curWatermark = ctx.timerService().currentWatermark() + + // discard late record + if (timestamp >= curWatermark) { + // ensure every key just registers one timer + ctx.timerService.registerEventTimeTimer(curWatermark + 1) + + // put row into state + var rowList = rowMapState.get(timestamp) + if (rowList == null) { + rowList = new util.ArrayList[Row]() + } + rowList.add(input) + rowMapState.put(timestamp, rowList) + } + } + + /** + * Called when a watermark arrived. + * Sorts records according the timestamp, computes aggregates, and emits all records with + * timestamp smaller than the watermark in timestamp order. + * + * @param timestamp The timestamp of the firing timer. + * @param ctx The ctx to register timer or get current time + * @param out The collector for returning result values. + */ + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + + Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]]) + val collector = out.asInstanceOf[TimestampedCollector[Row]] + + val keyIterator = rowMapState.keys.iterator + if (keyIterator.hasNext) { + val curWatermark = ctx.timerService.currentWatermark + var existEarlyRecord: Boolean = false + + // sort the record timestamps + do { + val recordTime = keyIterator.next + // only take timestamps smaller/equal to the watermark + if (recordTime <= curWatermark) { + insertToSortedList(recordTime) + } else { + existEarlyRecord = true + } + } while (keyIterator.hasNext) + + // get last accumulator + var lastAccumulator = accumulatorState.value + if (lastAccumulator == null) { + // initialize accumulator + lastAccumulator = function.createAccumulators() + } + + // emit the rows in order + while (!sortedTimestamps.isEmpty) { + val curTimestamp = sortedTimestamps.removeFirst() + val curRowList = rowMapState.get(curTimestamp) + collector.setAbsoluteTimestamp(curTimestamp) + + // process the same timestamp datas, the mechanism is different according ROWS or RANGE + processElementsWithSameTimestamp(curRowList, lastAccumulator, collector) + + rowMapState.remove(curTimestamp) + } + + accumulatorState.update(lastAccumulator) + + // if are are rows with timestamp > watermark, register a timer for the next watermark + if (existEarlyRecord) { + ctx.timerService.registerEventTimeTimer(curWatermark + 1) + } + } + } + + /** + * Inserts timestamps in order into a linked list. + * + * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just + * an append with O(1). + */ + private def insertToSortedList(recordTimestamp: Long) = { + val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size) + var continue = true + while (listIterator.hasPrevious && continue) { + val timestamp = listIterator.previous + if (recordTimestamp >= timestamp) { + listIterator.next + listIterator.add(recordTimestamp) + continue = false + } + } + + if (continue) { + sortedTimestamps.addFirst(recordTimestamp) + } + } + + /** + * Process the same timestamp datas, the mechanism is different between + * rows and range window. + */ + def processElementsWithSameTimestamp( + curRowList: JList[Row], + lastAccumulator: Row, + out: Collector[Row]): Unit + +} + +/** + * A ProcessFunction to support unbounded ROWS window. + * The ROWS clause defines on a physical level how many rows are included in a window frame. + */ +class RowTimeUnboundedRowsOver( + genAggregations: GeneratedAggregationsFunction, + intermediateType: TypeInformation[Row], + inputType: TypeInformation[Row]) + extends RowTimeUnboundedOver( + genAggregations: GeneratedAggregationsFunction, + intermediateType, + inputType) { + + override def processElementsWithSameTimestamp( + curRowList: JList[Row], + lastAccumulator: Row, + out: Collector[Row]): Unit = { + + var i = 0 + while (i < curRowList.size) { + val curRow = curRowList.get(i) + + var j = 0 + // copy forwarded fields to output row + function.setForwardedFields(curRow, output) + + // update accumulators and copy aggregates to output row + function.accumulate(lastAccumulator, curRow) + function.setAggregationResults(lastAccumulator, output) + // emit output row + out.collect(output) + i += 1 + } + } +} + + +/** + * A ProcessFunction to support unbounded RANGE window. + * The RANGE option includes all the rows within the window frame + * that have the same ORDER BY values as the current row. + */ +class RowTimeUnboundedRangeOver( + genAggregations: GeneratedAggregationsFunction, + intermediateType: TypeInformation[Row], + inputType: TypeInformation[Row]) + extends RowTimeUnboundedOver( + genAggregations: GeneratedAggregationsFunction, + intermediateType, + inputType) { + + override def processElementsWithSameTimestamp( + curRowList: JList[Row], + lastAccumulator: Row, + out: Collector[Row]): Unit = { + + var i = 0 + // all same timestamp data should have same aggregation value. + while (i < curRowList.size) { + val curRow = curRowList.get(i) + + function.accumulate(lastAccumulator, curRow) + i += 1 + } + + // emit output row + i = 0 + while (i < curRowList.size) { + val curRow = curRowList.get(i) + + // copy forwarded fields to output row + function.setForwardedFields(curRow, output) + + //copy aggregates to output row + function.setAggregationResults(lastAccumulator, output) + out.collect(output) + i += 1 + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala deleted file mode 100644 index 4539164..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.util -import java.util.{List => JList} - -import org.apache.flink.api.common.state._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.types.Row -import org.apache.flink.util.{Collector, Preconditions} -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} -import org.slf4j.LoggerFactory - -/** - * Process Function for ROWS clause event-time bounded OVER window - * - * @param genAggregations Generated aggregate helper function - * @param aggregationStateType row type info of aggregation - * @param inputRowType row type info of input row - * @param precedingOffset preceding offset - */ -class RowsClauseBoundedOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo, - inputRowType: RowTypeInfo, - precedingOffset: Long) - extends ProcessFunction[Row, Row] - with Compiler[GeneratedAggregations] { - - Preconditions.checkNotNull(aggregationStateType) - Preconditions.checkNotNull(precedingOffset) - - private var output: Row = _ - - // the state which keeps the last triggering timestamp - private var lastTriggeringTsState: ValueState[Long] = _ - - // the state which keeps the count of data - private var dataCountState: ValueState[Long] = _ - - // the state which used to materialize the accumulator for incremental calculation - private var accumulatorState: ValueState[Row] = _ - - // the state which keeps all the data that are not expired. - // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, - // the second element of tuple is a list that contains the entire data of all the rows belonging - // to this time stamp. - private var dataState: MapState[Long, JList[Row]] = _ - - val LOG = LoggerFactory.getLogger(this.getClass) - private var function: GeneratedAggregations = _ - - override def open(config: Configuration) { - LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + - s"Code:\n$genAggregations.code") - val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genAggregations.name, - genAggregations.code) - LOG.debug("Instantiating AggregateHelper.") - function = clazz.newInstance() - - output = function.createOutputRow() - - val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) - lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) - - val dataCountStateDescriptor = - new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) - dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) - - val accumulatorStateDescriptor = - new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) - accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) - - val keyTypeInformation: TypeInformation[Long] = - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] - val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) - - val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]]( - "dataState", - keyTypeInformation, - valueTypeInformation) - - dataState = getRuntimeContext.getMapState(mapStateDescriptor) - } - - override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { - - // triggering timestamp for trigger calculation - val triggeringTs = ctx.timestamp - - val lastTriggeringTs = lastTriggeringTsState.value - // check if the data is expired, if not, save the data and register event time timer - - if (triggeringTs > lastTriggeringTs) { - val data = dataState.get(triggeringTs) - if (null != data) { - data.add(input) - dataState.put(triggeringTs, data) - } else { - val data = new util.ArrayList[Row] - data.add(input) - dataState.put(triggeringTs, data) - // register event time timer - ctx.timerService.registerEventTimeTimer(triggeringTs) - } - } - } - - override def onTimer( - timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { - - // gets all window data from state for the calculation - val inputs: JList[Row] = dataState.get(timestamp) - - if (null != inputs) { - - var accumulators = accumulatorState.value - var dataCount = dataCountState.value - - var retractList: JList[Row] = null - var retractTs: Long = Long.MaxValue - var retractCnt: Int = 0 - var i = 0 - - while (i < inputs.size) { - val input = inputs.get(i) - - // initialize when first run or failover recovery per key - if (null == accumulators) { - accumulators = function.createAccumulators() - } - - var retractRow: Row = null - - if (dataCount >= precedingOffset) { - if (null == retractList) { - // find the smallest timestamp - retractTs = Long.MaxValue - val dataTimestampIt = dataState.keys.iterator - while (dataTimestampIt.hasNext) { - val dataTs = dataTimestampIt.next - if (dataTs < retractTs) { - retractTs = dataTs - } - } - // get the oldest rows to retract them - retractList = dataState.get(retractTs) - } - - retractRow = retractList.get(retractCnt) - retractCnt += 1 - - // remove retracted values from state - if (retractList.size == retractCnt) { - dataState.remove(retractTs) - retractList = null - retractCnt = 0 - } - } else { - dataCount += 1 - } - - // copy forwarded fields to output row - function.setForwardedFields(input, output) - - // retract old row from accumulators - if (null != retractRow) { - function.retract(accumulators, retractRow) - } - - // accumulate current row and set aggregate in output row - function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) - i += 1 - - out.collect(output) - } - - // update all states - if (dataState.contains(retractTs)) { - if (retractCnt > 0) { - retractList.subList(0, retractCnt).clear() - dataState.put(retractTs, retractList) - } - } - dataCountState.update(dataCount) - accumulatorState.update(accumulators) - } - - lastTriggeringTsState.update(timestamp) - } -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala deleted file mode 100644 index cca3e3f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.util -import java.util.{List => JList} - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.configuration.Configuration -import org.apache.flink.types.Row -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.util.{Collector, Preconditions} -import org.apache.flink.api.common.state._ -import org.apache.flink.api.java.typeutils.ListTypeInfo -import org.apache.flink.streaming.api.operators.TimestampedCollector -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} -import org.slf4j.LoggerFactory - - -/** - * A ProcessFunction to support unbounded event-time over-window - * - * @param genAggregations Generated aggregate helper function - * @param intermediateType the intermediate row tye which the state saved - * @param inputType the input row tye which the state saved - */ -abstract class UnboundedEventTimeOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] - with Compiler[GeneratedAggregations] { - - protected var output: Row = _ - // state to hold the accumulators of the aggregations - private var accumulatorState: ValueState[Row] = _ - // state to hold rows until the next watermark arrives - private var rowMapState: MapState[Long, JList[Row]] = _ - // list to sort timestamps to access rows in timestamp order - private var sortedTimestamps: util.LinkedList[Long] = _ - - val LOG = LoggerFactory.getLogger(this.getClass) - protected var function: GeneratedAggregations = _ - - override def open(config: Configuration) { - LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + - s"Code:\n$genAggregations.code") - val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genAggregations.name, - genAggregations.code) - LOG.debug("Instantiating AggregateHelper.") - function = clazz.newInstance() - - output = function.createOutputRow() - sortedTimestamps = new util.LinkedList[Long]() - - // initialize accumulator state - val accDescriptor: ValueStateDescriptor[Row] = - new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) - accumulatorState = getRuntimeContext.getState[Row](accDescriptor) - - // initialize row state - val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType) - val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]]("rowmapstate", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) - rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) - } - - /** - * Puts an element from the input stream into state if it is not late. - * Registers a timer for the next watermark. - * - * @param input The input value. - * @param ctx The ctx to register timer or get current time - * @param out The collector for returning result values. - * - */ - override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { - - val timestamp = ctx.timestamp() - val curWatermark = ctx.timerService().currentWatermark() - - // discard late record - if (timestamp >= curWatermark) { - // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) - - // put row into state - var rowList = rowMapState.get(timestamp) - if (rowList == null) { - rowList = new util.ArrayList[Row]() - } - rowList.add(input) - rowMapState.put(timestamp, rowList) - } - } - - /** - * Called when a watermark arrived. - * Sorts records according the timestamp, computes aggregates, and emits all records with - * timestamp smaller than the watermark in timestamp order. - * - * @param timestamp The timestamp of the firing timer. - * @param ctx The ctx to register timer or get current time - * @param out The collector for returning result values. - */ - override def onTimer( - timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { - - Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]]) - val collector = out.asInstanceOf[TimestampedCollector[Row]] - - val keyIterator = rowMapState.keys.iterator - if (keyIterator.hasNext) { - val curWatermark = ctx.timerService.currentWatermark - var existEarlyRecord: Boolean = false - - // sort the record timestamps - do { - val recordTime = keyIterator.next - // only take timestamps smaller/equal to the watermark - if (recordTime <= curWatermark) { - insertToSortedList(recordTime) - } else { - existEarlyRecord = true - } - } while (keyIterator.hasNext) - - // get last accumulator - var lastAccumulator = accumulatorState.value - if (lastAccumulator == null) { - // initialize accumulator - lastAccumulator = function.createAccumulators() - } - - // emit the rows in order - while (!sortedTimestamps.isEmpty) { - val curTimestamp = sortedTimestamps.removeFirst() - val curRowList = rowMapState.get(curTimestamp) - collector.setAbsoluteTimestamp(curTimestamp) - - // process the same timestamp datas, the mechanism is different according ROWS or RANGE - processElementsWithSameTimestamp(curRowList, lastAccumulator, collector) - - rowMapState.remove(curTimestamp) - } - - accumulatorState.update(lastAccumulator) - - // if are are rows with timestamp > watermark, register a timer for the next watermark - if (existEarlyRecord) { - ctx.timerService.registerEventTimeTimer(curWatermark + 1) - } - } - } - - /** - * Inserts timestamps in order into a linked list. - * - * If timestamps arrive in order (as in case of using the RocksDB state backend) this is just - * an append with O(1). - */ - private def insertToSortedList(recordTimestamp: Long) = { - val listIterator = sortedTimestamps.listIterator(sortedTimestamps.size) - var continue = true - while (listIterator.hasPrevious && continue) { - val timestamp = listIterator.previous - if (recordTimestamp >= timestamp) { - listIterator.next - listIterator.add(recordTimestamp) - continue = false - } - } - - if (continue) { - sortedTimestamps.addFirst(recordTimestamp) - } - } - - /** - * Process the same timestamp datas, the mechanism is different between - * rows and range window. - */ - def processElementsWithSameTimestamp( - curRowList: JList[Row], - lastAccumulator: Row, - out: Collector[Row]): Unit - -} - -/** - * A ProcessFunction to support unbounded ROWS window. - * The ROWS clause defines on a physical level how many rows are included in a window frame. - */ -class UnboundedEventTimeRowsOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) - extends UnboundedEventTimeOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - intermediateType, - inputType) { - - override def processElementsWithSameTimestamp( - curRowList: JList[Row], - lastAccumulator: Row, - out: Collector[Row]): Unit = { - - var i = 0 - while (i < curRowList.size) { - val curRow = curRowList.get(i) - - var j = 0 - // copy forwarded fields to output row - function.setForwardedFields(curRow, output) - - // update accumulators and copy aggregates to output row - function.accumulate(lastAccumulator, curRow) - function.setAggregationResults(lastAccumulator, output) - // emit output row - out.collect(output) - i += 1 - } - } -} - - -/** - * A ProcessFunction to support unbounded RANGE window. - * The RANGE option includes all the rows within the window frame - * that have the same ORDER BY values as the current row. - */ -class UnboundedEventTimeRangeOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) - extends UnboundedEventTimeOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - intermediateType, - inputType) { - - override def processElementsWithSameTimestamp( - curRowList: JList[Row], - lastAccumulator: Row, - out: Collector[Row]): Unit = { - - var i = 0 - // all same timestamp data should have same aggregation value. - while (i < curRowList.size) { - val curRow = curRowList.get(i) - - function.accumulate(lastAccumulator, curRow) - i += 1 - } - - // emit output row - i = 0 - while (i < curRowList.size) { - val curRow = curRowList.get(i) - - // copy forwarded fields to output row - function.setForwardedFields(curRow, output) - - //copy aggregates to output row - function.setAggregationResults(lastAccumulator, output) - out.collect(output) - i += 1 - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala deleted file mode 100644 index 1a8399b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.types.Row -import org.apache.flink.util.Collector -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} -import org.slf4j.LoggerFactory - -/** - * Process Function for non-partitioned processing-time unbounded OVER window - * - * @param genAggregations Generated aggregate helper function - * @param aggregationStateType row type info of aggregation - */ -class UnboundedNonPartitionedProcessingOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo) - extends ProcessFunction[Row, Row] - with CheckpointedFunction - with Compiler[GeneratedAggregations] { - - private var accumulators: Row = _ - private var output: Row = _ - private var state: ListState[Row] = _ - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: GeneratedAggregations = _ - - override def open(config: Configuration) { - LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + - s"Code:\n$genAggregations.code") - val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genAggregations.name, - genAggregations.code) - LOG.debug("Instantiating AggregateHelper.") - function = clazz.newInstance() - - output = function.createOutputRow() - if (null == accumulators) { - val it = state.get().iterator() - if (it.hasNext) { - accumulators = it.next() - } else { - accumulators = function.createAccumulators() - } - } - } - - override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { - - function.setForwardedFields(input, output) - - function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) - - out.collect(output) - } - - override def snapshotState(context: FunctionSnapshotContext): Unit = { - state.clear() - if (null != accumulators) { - state.add(accumulators) - } - } - - override def initializeState(context: FunctionInitializationContext): Unit = { - val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType) - state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala deleted file mode 100644 index 9a6d4f0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.types.Row -import org.apache.flink.util.Collector -import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.common.state.ValueState -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} -import org.slf4j.LoggerFactory - -/** - * Process Function for processing-time unbounded OVER window - * - * @param genAggregations Generated aggregate helper function - * @param aggregationStateType row type info of aggregation - */ -class UnboundedProcessingOverProcessFunction( - genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo) - extends ProcessFunction[Row, Row] - with Compiler[GeneratedAggregations] { - - private var output: Row = _ - private var state: ValueState[Row] = _ - val LOG = LoggerFactory.getLogger(this.getClass) - private var function: GeneratedAggregations = _ - - override def open(config: Configuration) { - LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + - s"Code:\n$genAggregations.code") - val clazz = compile( - getRuntimeContext.getUserCodeClassLoader, - genAggregations.name, - genAggregations.code) - LOG.debug("Instantiating AggregateHelper.") - function = clazz.newInstance() - - output = function.createOutputRow() - val stateDescriptor: ValueStateDescriptor[Row] = - new ValueStateDescriptor[Row]("overState", aggregationStateType) - state = getRuntimeContext.getState(stateDescriptor) - } - - override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { - - var accumulators = state.value() - - if (null == accumulators) { - accumulators = function.createAccumulators() - } - - function.setForwardedFields(input, output) - - function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) - - state.update(accumulators) - - out.collect(output) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/07f1b035/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 25ec36e..3610898 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -165,7 +165,7 @@ class BoundedProcessingOverRangeProcessFunctionTest { val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode) val processFunction = new KeyedProcessOperator[String, Row, Row]( - new BoundedProcessingOverRangeProcessFunction( + new ProcTimeBoundedRangeOver( genAggFunction, 1000, aggregationStateType,