[FLINK-6491] [table] Add QueryConfig and state clean up for over-windowed aggregates.
This closes #3863. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b61d153 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b61d153 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b61d153 Branch: refs/heads/release-1.3 Commit: 6b61d1539386388c1ff38183bb95e86ae70cc0f0 Parents: 003f81a Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Tue May 9 14:36:42 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri May 12 08:33:44 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 8 +- .../apache/flink/table/api/QueryConfig.scala | 102 ----- .../table/api/StreamTableEnvironment.scala | 38 +- .../table/api/java/StreamTableEnvironment.scala | 35 +- .../apache/flink/table/api/queryConfig.scala | 102 +++++ .../api/scala/StreamTableEnvironment.scala | 20 +- .../table/api/scala/TableConversions.scala | 13 +- .../org/apache/flink/table/api/table.scala | 6 +- .../plan/nodes/datastream/DataStreamCalc.scala | 5 +- .../nodes/datastream/DataStreamCorrelate.scala | 4 +- .../datastream/DataStreamGroupAggregate.scala | 10 +- .../DataStreamGroupWindowAggregate.scala | 4 +- .../datastream/DataStreamOverAggregate.scala | 34 +- .../plan/nodes/datastream/DataStreamRel.scala | 4 +- .../plan/nodes/datastream/DataStreamScan.scala | 2 +- .../plan/nodes/datastream/DataStreamUnion.scala | 6 +- .../nodes/datastream/DataStreamValues.scala | 2 +- .../datastream/StreamTableSourceScan.scala | 2 +- .../table/runtime/aggregate/AggregateUtil.scala | 32 +- .../aggregate/GroupAggProcessFunction.scala | 44 +- .../aggregate/ProcTimeBoundedRangeOver.scala | 26 +- .../aggregate/ProcTimeBoundedRowsOver.scala | 20 +- .../ProcTimeUnboundedNonPartitionedOver.scala | 20 +- .../ProcTimeUnboundedPartitionedOver.scala | 20 +- .../ProcessFunctionWithCleanupState.scala | 85 ++++ .../aggregate/RowTimeBoundedRangeOver.scala | 44 +- .../aggregate/RowTimeBoundedRowsOver.scala | 41 +- .../aggregate/RowTimeUnboundedOver.scala | 44 +- .../stream/table/GroupAggregationsITCase.scala | 13 +- ...ProcessingOverRangeProcessFunctionTest.scala | 336 -------------- .../table/runtime/harness/HarnessTestBase.scala | 281 +++++++++++- .../runtime/harness/NonWindowHarnessTest.scala | 157 +++++++ .../runtime/harness/OverWindowHarnessTest.scala | 458 ++++++++++--------- .../table/utils/MockTableEnvironment.scala | 2 +- 34 files changed, 1209 insertions(+), 811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index f33c187..3c0f51b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -113,17 +113,17 @@ abstract class BatchTableEnvironment( * * @param table The [[Table]] to write. * @param sink The [[TableSink]] to write the [[Table]] to. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. */ override private[flink] def writeToSink[T]( table: Table, sink: TableSink[T], - qConfig: QueryConfig): Unit = { + queryConfig: QueryConfig): Unit = { // We do not pass the configuration on, because there is nothing to configure for batch queries. - val bQConfig = qConfig match { - case batchConfig: BatchQueryConfig => batchConfig + queryConfig match { + case _: BatchQueryConfig => case _ => throw new TableException("BatchQueryConfig required to configure batch query.") } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala deleted file mode 100644 index 8e8b5ac..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api - -import _root_.java.io.Serializable -import org.apache.flink.api.common.time.Time - -class QueryConfig private[table] extends Serializable {} - -/** - * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries. - */ -class BatchQueryConfig private[table] extends QueryConfig - -/** - * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries. - * - * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]] - * method. - */ -class StreamQueryConfig private[table] extends QueryConfig { - - /** - * The minimum time until state which was not updated will be retained. - * State might be cleared and removed if it was not updated for the defined period of time. - */ - private var minIdleStateRetentionTime: Long = Long.MinValue - - /** - * The maximum time until state which was not updated will be retained. - * State will be cleared and removed if it was not updated for the defined period of time. - */ - private var maxIdleStateRetentionTime: Long = Long.MinValue - - /** - * Specifies the time interval for how long idle state, i.e., state which was not updated, will - * be retained. When state was not updated for the specified interval of time, it will be cleared - * and removed. - * - * When new data arrives for previously cleaned-up state, the new data will be handled as if it - * was the first data. This can result in previous results being overwritten. - * - * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and - * maximum time for state to be retained. This method is more efficient, because the system has - * to do less bookkeeping to identify the time at which state must be cleared. - * - * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never - * clean-up the state. - */ - def setIdleStateRetentionTime(time: Time): StreamQueryConfig = { - setIdleStateRetentionTime(time, time) - } - - /** - * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which - * was not updated, will be retained. - * State will never be cleared until it was idle for less than the minimum time and will never - * be kept if it was idle for more than the maximum time. - * - * When new data arrives for previously cleaned-up state, the new data will be handled as if it - * was the first data. This can result in previous results being overwritten. - * - * Set to 0 (zero) to never clean-up the state. - * - * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to - * never clean-up the state. - * @param maxTime The maximum time interval for which idle state is retained. May not be smaller - * than than minTime. Set to 0 (zero) to never clean-up the state. - */ - def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = { - if (maxTime.toMilliseconds < minTime.toMilliseconds) { - throw new IllegalArgumentException("maxTime may not be smaller than minTime.") - } - minIdleStateRetentionTime = minTime.toMilliseconds - maxIdleStateRetentionTime = maxTime.toMilliseconds - this - } - - def getMinIdleStateRetentionTime: Long = { - minIdleStateRetentionTime - } - - def getMaxIdleStateRetentionTime: Long = { - maxIdleStateRetentionTime - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index c594d4c..d68da04 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -81,7 +81,7 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r - def qConf: StreamQueryConfig = new StreamQueryConfig + def queryConfig: StreamQueryConfig = new StreamQueryConfig /** * Checks if the chosen table name is valid. @@ -128,16 +128,16 @@ abstract class StreamTableEnvironment( * * @param table The [[Table]] to write. * @param sink The [[TableSink]] to write the [[Table]] to. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ override private[flink] def writeToSink[T]( table: Table, sink: TableSink[T], - qConfig: QueryConfig): Unit = { + queryConfig: QueryConfig): Unit = { // Check query configuration - val sQConf = qConfig match { + val streamQueryConfig = queryConfig match { case streamConfig: StreamQueryConfig => streamConfig case _ => throw new TableException("StreamQueryConfig required to configure stream query.") @@ -150,7 +150,11 @@ abstract class StreamTableEnvironment( val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = - translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType) + translate( + table, + streamQueryConfig, + updatesAsRetraction = true, + withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. retractSink.asInstanceOf[RetractStreamTableSink[Any]] .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) @@ -176,7 +180,7 @@ abstract class StreamTableEnvironment( translate( optimizedPlan, table.getRelNode.getRowType, - sQConf, + streamQueryConfig, withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. upsertSink.asInstanceOf[UpsertStreamTableSink[Any]] @@ -196,7 +200,7 @@ abstract class StreamTableEnvironment( translate( optimizedPlan, table.getRelNode.getRowType, - sQConf, + streamQueryConfig, withChangeFlag = false)(outputType) // Give the DataStream to the TableSink to emit it. appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result) @@ -566,7 +570,7 @@ abstract class StreamTableEnvironment( * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. * * @param table The root node of the relational expression tree. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @param updatesAsRetraction Set to true to encode updates as retraction messages. * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. @@ -575,12 +579,12 @@ abstract class StreamTableEnvironment( */ protected def translate[A]( table: Table, - qConfig: StreamQueryConfig, + queryConfig: StreamQueryConfig, updatesAsRetraction: Boolean, withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = { val relNode = table.getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) - translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag) + translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag) } /** @@ -589,7 +593,7 @@ abstract class StreamTableEnvironment( * @param logicalPlan The root node of the relational expression tree. * @param logicalType The row type of the result. Since the logicalPlan can lose the * field naming during optimization we pass the row type separately. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. @@ -598,7 +602,7 @@ abstract class StreamTableEnvironment( protected def translate[A]( logicalPlan: RelNode, logicalType: RelDataType, - qConfig: StreamQueryConfig, + queryConfig: StreamQueryConfig, withChangeFlag: Boolean) (implicit tpe: TypeInformation[A]): DataStream[A] = { @@ -610,7 +614,7 @@ abstract class StreamTableEnvironment( } // get CRow plan - val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig) + val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) // convert CRow to output type val conversion = if (withChangeFlag) { @@ -642,16 +646,16 @@ abstract class StreamTableEnvironment( * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]]. * * @param logicalPlan The logical plan to translate. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @return The [[DataStream]] of type [[CRow]]. */ protected def translateToCRow( logicalPlan: RelNode, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { logicalPlan match { case node: DataStreamRel => - node.translateToPlan(this, qConfig) + node.translateToPlan(this, queryConfig) case _ => throw TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") @@ -667,7 +671,7 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast, updatesAsRetraction = false) - val dataStream = translateToCRow(optimizedPlan, qConf) + val dataStream = translateToCRow(optimizedPlan, queryConfig) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index c3b5951..311986c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -150,7 +150,7 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - toDataStream(table, clazz, qConf) + toDataStream(table, clazz, queryConfig) } /** @@ -170,7 +170,7 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { - toDataStream(table, typeInfo, qConf) + toDataStream(table, typeInfo, queryConfig) } /** @@ -186,14 +186,17 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param clazz The class of the type of the resulting [[DataStream]]. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = { + def toDataStream[T]( + table: Table, + clazz: Class[T], + queryConfig: StreamQueryConfig): DataStream[T] = { val typeInfo = TypeExtractor.createTypeInfo(clazz) TableEnvironment.validateType(typeInfo) - translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) } /** @@ -209,16 +212,16 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ def toDataStream[T]( table: Table, typeInfo: TypeInformation[T], - qConfig: StreamQueryConfig): DataStream[T] = { + queryConfig: StreamQueryConfig): DataStream[T] = { TableEnvironment.validateType(typeInfo) - translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) } /** @@ -242,7 +245,7 @@ class StreamTableEnvironment( table: Table, clazz: Class[T]): DataStream[JTuple2[JBool, T]] = { - toRetractStream(table, clazz, qConf) + toRetractStream(table, clazz, queryConfig) } /** @@ -266,7 +269,7 @@ class StreamTableEnvironment( table: Table, typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = { - toRetractStream(table, typeInfo, qConf) + toRetractStream(table, typeInfo, queryConfig) } /** @@ -283,21 +286,21 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param clazz The class of the requested record type. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the requested record type. * @return The converted [[DataStream]]. */ def toRetractStream[T]( table: Table, clazz: Class[T], - qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { + queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { val typeInfo = TypeExtractor.createTypeInfo(clazz) TableEnvironment.validateType(typeInfo) val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo) translate[JTuple2[JBool, T]]( table, - qConfig, + queryConfig, updatesAsRetraction = true, withChangeFlag = true)(resultType) } @@ -316,14 +319,14 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param typeInfo The [[TypeInformation]] of the requested record type. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the requested record type. * @return The converted [[DataStream]]. */ def toRetractStream[T]( table: Table, typeInfo: TypeInformation[T], - qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { + queryConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { TableEnvironment.validateType(typeInfo) val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]]( @@ -332,7 +335,7 @@ class StreamTableEnvironment( ) translate[JTuple2[JBool, T]]( table, - qConfig, + queryConfig, updatesAsRetraction = true, withChangeFlag = true)(resultTypeInfo) } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala new file mode 100644 index 0000000..c8fbab7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import _root_.java.io.Serializable +import org.apache.flink.api.common.time.Time + +class QueryConfig private[table] extends Serializable {} + +/** + * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries. + */ +class BatchQueryConfig private[table] extends QueryConfig + +/** + * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries. + * + * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.queryConfig]] + * method. + */ +class StreamQueryConfig private[table] extends QueryConfig { + + /** + * The minimum time until state which was not updated will be retained. + * State might be cleared and removed if it was not updated for the defined period of time. + */ + private var minIdleStateRetentionTime: Long = Long.MinValue + + /** + * The maximum time until state which was not updated will be retained. + * State will be cleared and removed if it was not updated for the defined period of time. + */ + private var maxIdleStateRetentionTime: Long = Long.MinValue + + /** + * Specifies the time interval for how long idle state, i.e., state which was not updated, will + * be retained. When state was not updated for the specified interval of time, it will be cleared + * and removed. + * + * When new data arrives for previously cleaned-up state, the new data will be handled as if it + * was the first data. This can result in previous results being overwritten. + * + * Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and + * maximum time for state to be retained. This method is more efficient, because the system has + * to do less bookkeeping to identify the time at which state must be cleared. + * + * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never + * clean-up the state. + */ + def withIdleStateRetentionTime(time: Time): StreamQueryConfig = { + withIdleStateRetentionTime(time, time) + } + + /** + * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which + * was not updated, will be retained. + * State will never be cleared until it was idle for less than the minimum time and will never + * be kept if it was idle for more than the maximum time. + * + * When new data arrives for previously cleaned-up state, the new data will be handled as if it + * was the first data. This can result in previous results being overwritten. + * + * Set to 0 (zero) to never clean-up the state. + * + * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to + * never clean-up the state. + * @param maxTime The maximum time interval for which idle state is retained. May not be smaller + * than than minTime. Set to 0 (zero) to never clean-up the state. + */ + def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = { + if (maxTime.toMilliseconds < minTime.toMilliseconds) { + throw new IllegalArgumentException("maxTime may not be smaller than minTime.") + } + minIdleStateRetentionTime = minTime.toMilliseconds + maxIdleStateRetentionTime = maxTime.toMilliseconds + this + } + + def getMinIdleStateRetentionTime: Long = { + minIdleStateRetentionTime + } + + def getMaxIdleStateRetentionTime: Long = { + maxIdleStateRetentionTime + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 56f7d55..8c6b273 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -143,7 +143,7 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { - toDataStream(table, qConf) + toDataStream(table, queryConfig) } /** @@ -158,14 +158,16 @@ class StreamTableEnvironment( * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * * @param table The [[Table]] to convert. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = { + def toDataStream[T: TypeInformation]( + table: Table, + queryConfig: StreamQueryConfig): DataStream[T] = { val returnType = createTypeInformation[T] - asScalaStream( - translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType)) + asScalaStream(translate( + table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType)) } /** @@ -180,7 +182,7 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { - toRetractStream(table, qConf) + toRetractStream(table, queryConfig) } /** @@ -191,16 +193,16 @@ class StreamTableEnvironment( * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. * * @param table The [[Table]] to convert. - * @param qConfig The configuration of the query to generate. + * @param queryConfig The configuration of the query to generate. * @tparam T The type of the requested data type. * @return The converted [[DataStream]]. */ def toRetractStream[T: TypeInformation]( table: Table, - qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { + queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { val returnType = createTypeInformation[(Boolean, T)] asScalaStream( - translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType)) + translate(table, queryConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 966b42f..9874a9e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -59,12 +59,12 @@ class TableConversions(table: Table) { /** Converts the [[Table]] to a [[DataStream]] of the specified type. * - * @param qConfig The configuration for the generated query. + * @param queryConfig The configuration for the generated query. */ - def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = { + def toDataStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toDataStream(table, qConfig) + tEnv.toDataStream(table, queryConfig) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + @@ -97,14 +97,15 @@ class TableConversions(table: Table) { * * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. * - * @param qConfig The configuration for the generated query. + * @param queryConfig The configuration for the generated query. * */ - def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { + def toRetractStream[T: TypeInformation]( + queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { table.tableEnv match { case tEnv: ScalaStreamTableEnv => - tEnv.toRetractStream(table, qConfig) + tEnv.toRetractStream(table, queryConfig) case _ => throw new TableException( "Only tables that originate from Scala DataStreams " + http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 5a2eb1c..ca61c65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -764,13 +764,13 @@ class Table( */ def writeToSink[T](sink: TableSink[T]): Unit = { - def qConfig = this.tableEnv match { - case s: StreamTableEnvironment => s.qConf + def queryConfig = this.tableEnv match { + case s: StreamTableEnvironment => s.queryConfig case b: BatchTableEnvironment => new BatchQueryConfig case _ => null } - writeToSink(sink, qConfig) + writeToSink(sink, queryConfig) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 0e377b5..5f270f6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -85,11 +85,12 @@ class DataStreamCalc( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig - val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val inputDataStream = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType val generator = new CodeGenerator(config, false, inputRowType) http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index cbd818a..5b32b10 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -84,12 +84,12 @@ class DataStreamCorrelate( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig // we do not need to specify input type - val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo] val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index f01b24a..e5d8088 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -106,16 +106,16 @@ class DataStreamGroupAggregate( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { - if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) { + if (groupings.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { LOG.warn( "No state retention interval configured for a query which accumulates state. " + "Please provide a query configuration with valid retention interval to prevent excessive " + - "state size. You may specify a retention time of 0 to not clean up the state.") + "state size. You may specify a retention time of 0 to not clean up the state.") } - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( @@ -149,7 +149,7 @@ class DataStreamGroupAggregate( inputSchema.logicalType, inputSchema.physicalFieldTypeInfo, groupings, - qConfig, + queryConfig, DataStreamRetractionRules.isAccRetract(this), DataStreamRetractionRules.isAccRetract(getInput)) http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index d2aaad0..2a71592 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -109,9 +109,9 @@ class DataStreamGroupWindowAggregate( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 8e97884..a9fbf02 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -36,6 +36,7 @@ import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.slf4j.LoggerFactory class DataStreamOverAggregate( logicWindow: Window, @@ -47,6 +48,7 @@ class DataStreamOverAggregate( extends SingleRel(cluster, traitSet, inputNode) with OverAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) override def deriveRowType(): RelDataType = schema.logicalType @@ -90,7 +92,7 @@ class DataStreamOverAggregate( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { if (logicWindow.groups.size > 1) { throw new TableException( @@ -112,10 +114,23 @@ class DataStreamOverAggregate( "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") } - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + if (consumeRetraction) { + throw new TableException( + "Retraction on Over window aggregation is not supported yet. " + + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") + } + + if (!logicWindow.groups.get(0).keys.isEmpty && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( + "No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent " + + "excessive state size. You may specify a retention time of 0 to not clean up the state.") + } + val generator = new CodeGenerator( tableEnv.getConfig, false, @@ -126,18 +141,13 @@ class DataStreamOverAggregate( .get(orderKey.getFieldIndex) .getType - if (consumeRetraction) { - throw new TableException( - "Retraction on Over window aggregation is not supported yet. " + - "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") - } - timeType match { case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => // proc-time OVER window if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { // unbounded OVER window createUnboundedAndCurrentRowOverWindow( + queryConfig, generator, inputDS, isRowTimeType = false, @@ -145,8 +155,10 @@ class DataStreamOverAggregate( } else if ( overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // bounded OVER window createBoundedAndCurrentRowOverWindow( + queryConfig, generator, inputDS, isRowTimeType = false, @@ -162,6 +174,7 @@ class DataStreamOverAggregate( overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { // unbounded OVER window createUnboundedAndCurrentRowOverWindow( + queryConfig, generator, inputDS, isRowTimeType = true, @@ -169,6 +182,7 @@ class DataStreamOverAggregate( } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { // bounded OVER window createBoundedAndCurrentRowOverWindow( + queryConfig, generator, inputDS, isRowTimeType = true, @@ -185,6 +199,7 @@ class DataStreamOverAggregate( } def createUnboundedAndCurrentRowOverWindow( + queryConfig: StreamQueryConfig, generator: CodeGenerator, inputDS: DataStream[CRow], isRowTimeType: Boolean, @@ -210,6 +225,7 @@ class DataStreamOverAggregate( inputSchema.physicalType, inputSchema.physicalTypeInfo, inputSchema.physicalFieldTypeInfo, + queryConfig, isRowTimeType, partitionKeys.nonEmpty, isRowsClause) @@ -242,6 +258,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( + queryConfig: StreamQueryConfig, generator: CodeGenerator, inputDS: DataStream[CRow], isRowTimeType: Boolean, @@ -269,6 +286,7 @@ class DataStreamOverAggregate( inputSchema.physicalTypeInfo, inputSchema.physicalFieldTypeInfo, precedingOffset, + queryConfig, isRowsClause, isRowTimeType ) http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 6f6edf7..65d336f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -29,12 +29,12 @@ trait DataStreamRel extends FlinkRelNode { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @param qConfig The configuration for the query to generate. + * @param queryConfig The configuration for the query to generate. * @return DataStream of type [[CRow]] */ def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] + queryConfig: StreamQueryConfig): DataStream[CRow] /** * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index e64bf0f..424c6a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -56,7 +56,7 @@ class DataStreamScan( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index 6cc7396..6f4980a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -60,10 +60,10 @@ class DataStreamUnion( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { - val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) - val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) + val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) leftDataSet.union(rightDataSet) } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index ba6b025..d7c490f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -58,7 +58,7 @@ class DataStreamValues( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 225f23f..51e609f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -100,7 +100,7 @@ class StreamTableSourceScan( override def translateToPlan( tableEnv: StreamTableEnvironment, - qConfig: StreamQueryConfig): DataStream[CRow] = { + queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 27392c7..8073959 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -77,6 +77,7 @@ object AggregateUtil { inputType: RelDataType, inputTypeInfo: TypeInformation[Row], inputFieldTypeInfo: Seq[TypeInformation[_]], + queryConfig: StreamQueryConfig, isRowTimeType: Boolean, isPartitioned: Boolean, isRowsClause: Boolean) @@ -117,23 +118,27 @@ object AggregateUtil { new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, - CRowTypeInfo(inputTypeInfo)) + CRowTypeInfo(inputTypeInfo), + queryConfig) } else { // RANGE unbounded over process function new RowTimeUnboundedRangeOver( genFunction, aggregationStateType, - CRowTypeInfo(inputTypeInfo)) + CRowTypeInfo(inputTypeInfo), + queryConfig) } } else { if (isPartitioned) { new ProcTimeUnboundedPartitionedOver( genFunction, - aggregationStateType) + aggregationStateType, + queryConfig) } else { new ProcTimeUnboundedNonPartitionedOver( genFunction, - aggregationStateType) + aggregationStateType, + queryConfig) } } } @@ -155,7 +160,7 @@ object AggregateUtil { inputRowType: RelDataType, inputFieldTypes: Seq[TypeInformation[_]], groupings: Array[Int], - qConfig: StreamQueryConfig, + queryConfig: StreamQueryConfig, generateRetraction: Boolean, consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = { @@ -192,7 +197,7 @@ object AggregateUtil { genFunction, aggregationStateType, generateRetraction, - qConfig) + queryConfig) } @@ -217,6 +222,7 @@ object AggregateUtil { inputTypeInfo: TypeInformation[Row], inputFieldTypeInfo: Seq[TypeInformation[_]], precedingOffset: Long, + queryConfig: StreamQueryConfig, isRowsClause: Boolean, isRowTimeType: Boolean) : ProcessFunction[CRow, CRow] = { @@ -258,15 +264,15 @@ object AggregateUtil { genFunction, aggregationStateType, inputRowType, - precedingOffset - ) + precedingOffset, + queryConfig) } else { new RowTimeBoundedRangeOver( genFunction, aggregationStateType, inputRowType, - precedingOffset - ) + precedingOffset, + queryConfig) } } else { if (isRowsClause) { @@ -274,13 +280,15 @@ object AggregateUtil { genFunction, precedingOffset, aggregationStateType, - inputRowType) + inputRowType, + queryConfig) } else { new ProcTimeBoundedRangeOver( genFunction, precedingOffset, aggregationStateType, - inputRowType) + inputRowType, + queryConfig) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 84fee87..57ea86e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -41,19 +41,13 @@ class GroupAggProcessFunction( private val genAggregations: GeneratedAggregationsFunction, private val aggregationStateType: RowTypeInfo, private val generateRetraction: Boolean, - private val qConfig: StreamQueryConfig) - extends ProcessFunction[CRow, CRow] + private val queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { val LOG: Logger = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ - private val minRetentionTime = qConfig.getMinIdleStateRetentionTime - private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime - private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 - // interval in which clean-up timers are registered - private val cleanupTimerInterval = maxRetentionTime - minRetentionTime - private var newRow: CRow = _ private var prevRow: CRow = _ private var firstRow: Boolean = _ @@ -61,8 +55,6 @@ class GroupAggProcessFunction( private var state: ValueState[Row] = _ // counts the number of added and retracted input records private var cntState: ValueState[JLong] = _ - // holds the latest registered cleanup timer - private var cleanupTimeState: ValueState[JLong] = _ override def open(config: Configuration) { LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + @@ -84,11 +76,7 @@ class GroupAggProcessFunction( new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG) cntState = getRuntimeContext.getState(inputCntDescriptor) - if (stateCleaningEnabled) { - val inputCntDescriptor: ValueStateDescriptor[JLong] = - new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG) - cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) - } + initCleanupTimeState("GroupAggregateCleanupTime") } override def processElement( @@ -96,22 +84,9 @@ class GroupAggProcessFunction( ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { - if (stateCleaningEnabled) { - - val currentTime = ctx.timerService().currentProcessingTime() - val earliestCleanup = currentTime + minRetentionTime - - // last registered timer - val lastCleanupTime = cleanupTimeState.value() - - if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { - // we need to register a new timer - val cleanupTime = earliestCleanup + cleanupTimerInterval - // register timer and remember clean-up time - ctx.timerService().registerProcessingTimeTimer(cleanupTime) - cleanupTimeState.update(cleanupTime) - } - } + val currentTime = ctx.timerService().currentProcessingTime() + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, currentTime) val input = inputC.row @@ -182,11 +157,8 @@ class GroupAggProcessFunction( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (timestamp == cleanupTimeState.value()) { - // clear all state - this.state.clear() - this.cntState.clear() - this.cleanupTimeState.clear() + if (needToCleanupState(timestamp)) { + cleanupState(state, cntState) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index 3fb506f..d50912c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import java.util.{ArrayList, List => JList} import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory @@ -48,9 +49,11 @@ class ProcTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, precedingTimeBoundary: Long, aggregatesTypeInfo: RowTypeInfo, - inputType: TypeInformation[CRow]) - extends ProcessFunction[CRow, CRow] + inputType: TypeInformation[CRow], + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { + private var output: CRow = _ private var accumulatorState: ValueState[Row] = _ private var rowMapState: MapState[Long, JList[Row]] = _ @@ -81,6 +84,8 @@ class ProcTimeBoundedRangeOver( val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("overState", aggregatesTypeInfo) accumulatorState = getRuntimeContext.getState(stateDescriptor) + + initCleanupTimeState("ProcTimeBoundedRangeOverCleanupTime") } override def processElement( @@ -89,6 +94,9 @@ class ProcTimeBoundedRangeOver( out: Collector[CRow]): Unit = { val currentTime = ctx.timerService.currentProcessingTime + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, currentTime) + // buffer the event incoming event // add current element to the window list of elements with corresponding timestamp @@ -109,7 +117,15 @@ class ProcTimeBoundedRangeOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - // we consider the original timestamp of events that have registered this time trigger 1 ms ago + if (needToCleanupState(timestamp)) { + // clean up and return + cleanupState(rowMapState, accumulatorState) + return + } + + // we consider the original timestamp of events + // that have registered this time trigger 1 ms ago + val currentTime = timestamp - 1 var i = 0 @@ -153,7 +169,8 @@ class ProcTimeBoundedRangeOver( // get the list of elements of current proctime val currentElements = rowMapState.get(currentTime) - // add current elements to aggregator. Multiple elements might have arrived in the same proctime + // add current elements to aggregator. Multiple elements might + // have arrived in the same proctime // the same accumulator value will be computed for all elements var iElemenets = 0 while (iElemenets < currentElements.size()) { @@ -178,7 +195,6 @@ class ProcTimeBoundedRangeOver( // update the value of accumulators for future incremental computation accumulatorState.update(accumulators) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala index 0c7f44e..e388c93 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala @@ -33,6 +33,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import java.util.{List => JList} import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory @@ -49,8 +50,9 @@ class ProcTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, precedingOffset: Long, aggregatesTypeInfo: RowTypeInfo, - inputType: TypeInformation[CRow]) - extends ProcessFunction[CRow, CRow] + inputType: TypeInformation[CRow], + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { Preconditions.checkArgument(precedingOffset > 0) @@ -99,6 +101,8 @@ class ProcTimeBoundedRowsOver( val smallestTimestampDescriptor : ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + + initCleanupTimeState("ProcTimeBoundedRowsOverCleanupTime") } override def processElement( @@ -110,6 +114,9 @@ class ProcTimeBoundedRowsOver( val currentTime = ctx.timerService.currentProcessingTime + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, currentTime) + // initialize state for the processed element var accumulators = accumulatorState.value if (accumulators == null) { @@ -180,4 +187,13 @@ class ProcTimeBoundedRowsOver( out.collect(output) } + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + + if (needToCleanupState(timestamp)) { + cleanupState(rowMapState, accumulatorState, counterState, smallestTsState) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala index 8a23132..2a6c9c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.util.Collector import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -37,8 +38,9 @@ import org.slf4j.LoggerFactory */ class ProcTimeUnboundedNonPartitionedOver( genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo) - extends ProcessFunction[CRow, CRow] + aggregationStateType: RowTypeInfo, + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with CheckpointedFunction with Compiler[GeneratedAggregations] { @@ -68,12 +70,16 @@ class ProcTimeUnboundedNonPartitionedOver( accumulators = function.createAccumulators() } } + initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime") } override def processElement( inputC: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + val input = inputC.row @@ -85,6 +91,16 @@ class ProcTimeUnboundedNonPartitionedOver( out.collect(output) } + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + + if (needToCleanupState(timestamp)) { + cleanupState(state) + } + } + override def snapshotState(context: FunctionSnapshotContext): Unit = { state.clear() if (null != accumulators) { http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala index 847c1bf..97f0ad7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala @@ -24,6 +24,7 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.CRow import org.slf4j.LoggerFactory @@ -36,8 +37,9 @@ import org.slf4j.LoggerFactory */ class ProcTimeUnboundedPartitionedOver( genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo) - extends ProcessFunction[CRow, CRow] + aggregationStateType: RowTypeInfo, + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { private var output: CRow = _ @@ -59,6 +61,8 @@ class ProcTimeUnboundedPartitionedOver( val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("overState", aggregationStateType) state = getRuntimeContext.getState(stateDescriptor) + + initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime") } override def processElement( @@ -66,6 +70,9 @@ class ProcTimeUnboundedPartitionedOver( ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + val input = inputC.row var accumulators = state.value() @@ -83,4 +90,13 @@ class ProcTimeUnboundedPartitionedOver( out.collect(output) } + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + + if (needToCleanupState(timestamp)) { + cleanupState(state) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala new file mode 100644 index 0000000..292fd3b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.TimeDomain +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryConfig) + extends ProcessFunction[IN, OUT]{ + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction[IN, OUT]#Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + // last registered timer + val curCleanupTime = cleanupTimeState.value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + + protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = { + ctx.timeDomain() == TimeDomain.PROCESSING_TIME + } + + protected def needToCleanupState(timestamp: Long): Boolean = { + if (stateCleaningEnabled) { + val cleanupTime = cleanupTimeState.value() + // check that the triggered timer is the last registered processing time timer. + null != cleanupTime && timestamp == cleanupTime + } else { + false + } + } + + protected def cleanupState(states: State*): Unit = { + // clear all state + states.foreach(_.clear()) + this.cleanupTimeState.clear() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index 4020d44..65edf6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row @@ -42,8 +43,9 @@ class RowTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo, inputRowType: CRowTypeInfo, - precedingOffset: Long) - extends ProcessFunction[CRow, CRow] + precedingOffset: Long, + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { Preconditions.checkNotNull(aggregationStateType) Preconditions.checkNotNull(precedingOffset) @@ -97,6 +99,8 @@ class RowTimeBoundedRangeOver( valueTypeInformation) dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + initCleanupTimeState("RowTimeBoundedRangeOverCleanupTime") } override def processElement( @@ -106,6 +110,9 @@ class RowTimeBoundedRangeOver( val input = inputC.row + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + // triggering timestamp for trigger calculation val triggeringTs = ctx.timestamp @@ -131,6 +138,34 @@ class RowTimeBoundedRangeOver( timestamp: Long, ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { + + if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { + if (needToCleanupState(timestamp)) { + + val keysIt = dataState.keys.iterator() + val lastProcessedTime = lastTriggeringTsState.value + + // is data left which has not been processed yet? + var noRecordsToProcess = true + while (keysIt.hasNext && noRecordsToProcess) { + if (keysIt.next() > lastProcessedTime) { + noRecordsToProcess = false + } + } + + if (noRecordsToProcess) { + // we clean the state + cleanupState(dataState, accumulatorState, lastTriggeringTsState) + } else { + // There are records left to process because a watermark has not been received yet. + // This would only happen if the input stream has stopped. So we don't need to clean up. + // We leave the state as it is and schedule a new cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + } + } + return + } + // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) @@ -196,8 +231,11 @@ class RowTimeBoundedRangeOver( // update state accumulatorState.update(accumulators) - lastTriggeringTsState.update(timestamp) } + lastTriggeringTsState.update(timestamp) + + // update cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index 5ec6ec7..395ae39 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} @@ -43,8 +44,9 @@ class RowTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo, inputRowType: CRowTypeInfo, - precedingOffset: Long) - extends ProcessFunction[CRow, CRow] + precedingOffset: Long, + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { Preconditions.checkNotNull(aggregationStateType) @@ -106,6 +108,8 @@ class RowTimeBoundedRowsOver( valueTypeInformation) dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + initCleanupTimeState("RowTimeBoundedRowsOverCleanupTime") } override def processElement( @@ -115,6 +119,9 @@ class RowTimeBoundedRowsOver( val input = inputC.row + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + // triggering timestamp for trigger calculation val triggeringTs = ctx.timestamp @@ -141,6 +148,33 @@ class RowTimeBoundedRowsOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { + if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { + if (needToCleanupState(timestamp)) { + + val keysIt = dataState.keys.iterator() + val lastProcessedTime = lastTriggeringTsState.value + + // is data left which has not been processed yet? + var noRecordsToProcess = true + while (keysIt.hasNext && noRecordsToProcess) { + if (keysIt.next() > lastProcessedTime) { + noRecordsToProcess = false + } + } + + if (noRecordsToProcess) { + // We clean the state + cleanupState(dataState, accumulatorState, dataCountState, lastTriggeringTsState) + } else { + // There are records left to process because a watermark has not been received yet. + // This would only happen if the input stream has stopped. So we don't need to clean up. + // We leave the state as it is and schedule a new cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + } + } + return + } + // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) @@ -220,6 +254,9 @@ class RowTimeBoundedRowsOver( } lastTriggeringTsState.update(timestamp) + + // update cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b61d153/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index 3e2a811..741d2b4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -28,6 +28,7 @@ import org.apache.flink.util.{Collector, Preconditions} import org.apache.flink.api.common.state._ import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory @@ -43,8 +44,9 @@ import org.slf4j.LoggerFactory abstract class RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[CRow]) - extends ProcessFunction[CRow, CRow] + inputType: TypeInformation[CRow], + queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { protected var output: CRow = _ @@ -83,6 +85,8 @@ abstract class RowTimeUnboundedOver( new MapStateDescriptor[Long, JList[Row]]("rowmapstate", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + + initCleanupTimeState("RowTimeUnboundedOverCleanupTime") } /** @@ -101,6 +105,9 @@ abstract class RowTimeUnboundedOver( val input = inputC.row + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + val timestamp = ctx.timestamp() val curWatermark = ctx.timerService().currentWatermark() @@ -133,6 +140,24 @@ abstract class RowTimeUnboundedOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { + if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { + if (needToCleanupState(timestamp)) { + + // we check whether there are still records which have not been processed yet + val noRecordsToProcess = !rowMapState.keys.iterator().hasNext + if (noRecordsToProcess) { + // we clean the state + cleanupState(rowMapState, accumulatorState) + } else { + // There are records left to process because a watermark has not been received yet. + // This would only happen if the input stream has stopped. So we don't need to clean up. + // We leave the state as it is and schedule a new cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + } + } + return + } + Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]]) val collector = out.asInstanceOf[TimestampedCollector[CRow]] @@ -178,6 +203,9 @@ abstract class RowTimeUnboundedOver( ctx.timerService.registerEventTimeTimer(curWatermark + 1) } } + + // update cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } /** @@ -221,11 +249,13 @@ abstract class RowTimeUnboundedOver( class RowTimeUnboundedRowsOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[CRow]) + inputType: TypeInformation[CRow], + queryConfig: StreamQueryConfig) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, - inputType) { + inputType, + queryConfig) { override def processElementsWithSameTimestamp( curRowList: JList[Row], @@ -259,11 +289,13 @@ class RowTimeUnboundedRowsOver( class RowTimeUnboundedRangeOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[CRow]) + inputType: TypeInformation[CRow], + queryConfig: StreamQueryConfig) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, - inputType) { + inputType, + queryConfig) { override def processElementsWithSameTimestamp( curRowList: JList[Row],