Repository: flink Updated Branches: refs/heads/master 704098725 -> b50ef4b8d
[FLINK-6491] [table] Add QueryConfig and state clean up for non-windowed aggregates. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16339db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16339db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16339db Branch: refs/heads/master Commit: d16339db83b98f41ef14fbd278530dc219f02ed8 Parents: 7040987 Author: Fabian Hueske <fhue...@apache.org> Authored: Mon May 8 18:41:37 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu May 11 23:42:19 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 17 ++- .../apache/flink/table/api/QueryConfig.scala | 102 ++++++++++++++++ .../table/api/StreamTableEnvironment.scala | 51 ++++++-- .../flink/table/api/TableEnvironment.scala | 2 +- .../table/api/java/StreamTableEnvironment.scala | 115 +++++++++++++++++-- .../api/scala/StreamTableEnvironment.scala | 46 +++++++- .../table/api/scala/TableConversions.scala | 40 ++++++- .../org/apache/flink/table/api/table.scala | 26 ++++- .../plan/nodes/datastream/DataStreamCalc.scala | 8 +- .../nodes/datastream/DataStreamCorrelate.scala | 8 +- .../datastream/DataStreamGroupAggregate.scala | 20 +++- .../DataStreamGroupWindowAggregate.scala | 8 +- .../datastream/DataStreamOverAggregate.scala | 9 +- .../plan/nodes/datastream/DataStreamRel.scala | 7 +- .../plan/nodes/datastream/DataStreamScan.scala | 7 +- .../plan/nodes/datastream/DataStreamUnion.scala | 10 +- .../nodes/datastream/DataStreamValues.scala | 6 +- .../datastream/StreamTableSourceScan.scala | 7 +- .../table/runtime/aggregate/AggregateUtil.scala | 6 +- .../aggregate/GroupAggProcessFunction.scala | 54 ++++++++- .../table/utils/MockTableEnvironment.scala | 9 +- 21 files changed, 494 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 2a3cedf..f33c187 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -32,7 +32,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.explain.PlanJsonParser -import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute} +import org.apache.flink.table.expressions.{Expression, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets @@ -113,9 +113,20 @@ abstract class BatchTableEnvironment( * * @param table The [[Table]] to write. * @param sink The [[TableSink]] to write the [[Table]] to. + * @param qConfig The configuration for the query to generate. * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. */ - override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T]( + table: Table, + sink: TableSink[T], + qConfig: QueryConfig): Unit = { + + // We do not pass the configuration on, because there is nothing to configure for batch queries. + val bQConfig = qConfig match { + case batchConfig: BatchQueryConfig => batchConfig + case _ => + throw new TableException("BatchQueryConfig required to configure batch query.") + } sink match { case batchSink: BatchTableSink[T] => @@ -125,7 +136,7 @@ abstract class BatchTableEnvironment( // Give the DataSet to the TableSink to emit it. batchSink.emitDataSet(result) case _ => - throw new TableException("BatchTableSink required to emit batch Table") + throw new TableException("BatchTableSink required to emit batch Table.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala new file mode 100644 index 0000000..8e8b5ac --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import _root_.java.io.Serializable +import org.apache.flink.api.common.time.Time + +class QueryConfig private[table] extends Serializable {} + +/** + * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries. + */ +class BatchQueryConfig private[table] extends QueryConfig + +/** + * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries. + * + * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]] + * method. + */ +class StreamQueryConfig private[table] extends QueryConfig { + + /** + * The minimum time until state which was not updated will be retained. + * State might be cleared and removed if it was not updated for the defined period of time. + */ + private var minIdleStateRetentionTime: Long = Long.MinValue + + /** + * The maximum time until state which was not updated will be retained. + * State will be cleared and removed if it was not updated for the defined period of time. + */ + private var maxIdleStateRetentionTime: Long = Long.MinValue + + /** + * Specifies the time interval for how long idle state, i.e., state which was not updated, will + * be retained. When state was not updated for the specified interval of time, it will be cleared + * and removed. + * + * When new data arrives for previously cleaned-up state, the new data will be handled as if it + * was the first data. This can result in previous results being overwritten. + * + * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and + * maximum time for state to be retained. This method is more efficient, because the system has + * to do less bookkeeping to identify the time at which state must be cleared. + * + * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never + * clean-up the state. + */ + def setIdleStateRetentionTime(time: Time): StreamQueryConfig = { + setIdleStateRetentionTime(time, time) + } + + /** + * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which + * was not updated, will be retained. + * State will never be cleared until it was idle for less than the minimum time and will never + * be kept if it was idle for more than the maximum time. + * + * When new data arrives for previously cleaned-up state, the new data will be handled as if it + * was the first data. This can result in previous results being overwritten. + * + * Set to 0 (zero) to never clean-up the state. + * + * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to + * never clean-up the state. + * @param maxTime The maximum time interval for which idle state is retained. May not be smaller + * than than minTime. Set to 0 (zero) to never clean-up the state. + */ + def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = { + if (maxTime.toMilliseconds < minTime.toMilliseconds) { + throw new IllegalArgumentException("maxTime may not be smaller than minTime.") + } + minIdleStateRetentionTime = minTime.toMilliseconds + maxIdleStateRetentionTime = maxTime.toMilliseconds + this + } + + def getMinIdleStateRetentionTime: Long = { + minIdleStateRetentionTime + } + + def getMaxIdleStateRetentionTime: Long = { + maxIdleStateRetentionTime + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index aef2b1b..c594d4c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r + def qConf: StreamQueryConfig = new StreamQueryConfig + /** * Checks if the chosen table name is valid. * @@ -126,9 +128,20 @@ abstract class StreamTableEnvironment( * * @param table The [[Table]] to write. * @param sink The [[TableSink]] to write the [[Table]] to. + * @param qConfig The configuration for the query to generate. * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ - override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T]( + table: Table, + sink: TableSink[T], + qConfig: QueryConfig): Unit = { + + // Check query configuration + val sQConf = qConfig match { + case streamConfig: StreamQueryConfig => streamConfig + case _ => + throw new TableException("StreamQueryConfig required to configure stream query.") + } sink match { @@ -137,7 +150,7 @@ abstract class StreamTableEnvironment( val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = - translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType) + translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. retractSink.asInstanceOf[RetractStreamTableSink[Any]] .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) @@ -160,7 +173,11 @@ abstract class StreamTableEnvironment( val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = - translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType) + translate( + optimizedPlan, + table.getRelNode.getRowType, + sQConf, + withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. upsertSink.asInstanceOf[UpsertStreamTableSink[Any]] .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) @@ -176,7 +193,11 @@ abstract class StreamTableEnvironment( val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = - translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType) + translate( + optimizedPlan, + table.getRelNode.getRowType, + sQConf, + withChangeFlag = false)(outputType) // Give the DataStream to the TableSink to emit it. appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result) @@ -545,17 +566,21 @@ abstract class StreamTableEnvironment( * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. * * @param table The root node of the relational expression tree. + * @param qConfig The configuration for the query to generate. * @param updatesAsRetraction Set to true to encode updates as retraction messages. * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ - protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean) - (implicit tpe: TypeInformation[A]): DataStream[A] = { + protected def translate[A]( + table: Table, + qConfig: StreamQueryConfig, + updatesAsRetraction: Boolean, + withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = { val relNode = table.getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) - translate(dataStreamPlan, relNode.getRowType, withChangeFlag) + translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag) } /** @@ -564,6 +589,7 @@ abstract class StreamTableEnvironment( * @param logicalPlan The root node of the relational expression tree. * @param logicalType The row type of the result. Since the logicalPlan can lose the * field naming during optimization we pass the row type separately. + * @param qConfig The configuration for the query to generate. * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. @@ -572,6 +598,7 @@ abstract class StreamTableEnvironment( protected def translate[A]( logicalPlan: RelNode, logicalType: RelDataType, + qConfig: StreamQueryConfig, withChangeFlag: Boolean) (implicit tpe: TypeInformation[A]): DataStream[A] = { @@ -583,7 +610,7 @@ abstract class StreamTableEnvironment( } // get CRow plan - val plan: DataStream[CRow] = translateToCRow(logicalPlan) + val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig) // convert CRow to output type val conversion = if (withChangeFlag) { @@ -615,14 +642,16 @@ abstract class StreamTableEnvironment( * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]]. * * @param logicalPlan The logical plan to translate. + * @param qConfig The configuration for the query to generate. * @return The [[DataStream]] of type [[CRow]]. */ protected def translateToCRow( - logicalPlan: RelNode): DataStream[CRow] = { + logicalPlan: RelNode, + qConfig: StreamQueryConfig): DataStream[CRow] = { logicalPlan match { case node: DataStreamRel => - node.translateToPlan(this) + node.translateToPlan(this, qConfig) case _ => throw TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") @@ -638,7 +667,7 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast, updatesAsRetraction = false) - val dataStream = translateToCRow(optimizedPlan) + val dataStream = translateToCRow(optimizedPlan, qConf) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bf4a8e0..9f50f0c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -510,7 +510,7 @@ abstract class TableEnvironment(val config: TableConfig) { * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The data type that the [[TableSink]] expects. */ - private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit + private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit /** * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index a70bcca..c3b5951 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -150,9 +150,50 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { + toDataStream(table, clazz, qConf) + } + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { + toDataStream(table, typeInfo, qConf) + } + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the type of the resulting [[DataStream]]. + * @param qConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = { val typeInfo = TypeExtractor.createTypeInfo(clazz) TableEnvironment.validateType(typeInfo) - translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) } /** @@ -168,12 +209,64 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]]. + * @param qConfig The configuration of the query to generate. * @tparam T The type of the resulting [[DataStream]]. * @return The converted [[DataStream]]. */ - def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { + def toDataStream[T]( + table: Table, + typeInfo: TypeInformation[T], + qConfig: StreamQueryConfig): DataStream[T] = { TableEnvironment.validateType(typeInfo) - translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T]( + table: Table, + clazz: Class[T]): DataStream[JTuple2[JBool, T]] = { + + toRetractStream(table, clazz, qConf) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T]( + table: Table, + typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = { + + toRetractStream(table, typeInfo, qConf) } /** @@ -190,17 +283,21 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param clazz The class of the requested record type. + * @param qConfig The configuration of the query to generate. * @tparam T The type of the requested record type. * @return The converted [[DataStream]]. */ - def toRetractStream[T](table: Table, clazz: Class[T]): - DataStream[JTuple2[JBool, T]] = { + def toRetractStream[T]( + table: Table, + clazz: Class[T], + qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { val typeInfo = TypeExtractor.createTypeInfo(clazz) TableEnvironment.validateType(typeInfo) val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo) translate[JTuple2[JBool, T]]( table, + qConfig, updatesAsRetraction = true, withChangeFlag = true)(resultType) } @@ -219,11 +316,14 @@ class StreamTableEnvironment( * * @param table The [[Table]] to convert. * @param typeInfo The [[TypeInformation]] of the requested record type. + * @param qConfig The configuration of the query to generate. * @tparam T The type of the requested record type. * @return The converted [[DataStream]]. */ - def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]): - DataStream[JTuple2[JBool, T]] = { + def toRetractStream[T]( + table: Table, + typeInfo: TypeInformation[T], + qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = { TableEnvironment.validateType(typeInfo) val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]]( @@ -232,6 +332,7 @@ class StreamTableEnvironment( ) translate[JTuple2[JBool, T]]( table, + qConfig, updatesAsRetraction = true, withChangeFlag = true)(resultTypeInfo) } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index e5ad6c2..56f7d55 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} +import org.apache.flink.table.api.{StreamQueryConfig, Table, TableConfig, TableEnvironment} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{AggregateFunction, TableFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} @@ -143,8 +143,29 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { + toDataStream(table, qConf) + } + + /** + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. + * + * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param qConfig The configuration of the query to generate. + * @tparam T The type of the resulting [[DataStream]]. + * @return The converted [[DataStream]]. + */ + def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = { val returnType = createTypeInformation[T] - asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType)) + asScalaStream( + translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType)) } /** @@ -159,8 +180,27 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { + toRetractStream(table, qConf) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + * @param table The [[Table]] to convert. + * @param qConfig The configuration of the query to generate. + * @tparam T The type of the requested data type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T: TypeInformation]( + table: Table, + qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { val returnType = createTypeInformation[(Boolean, T)] - asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType)) + asScalaStream( + translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 5efff62..966b42f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.table.api.{Table, TableException} +import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv} import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv} @@ -57,6 +57,21 @@ class TableConversions(table: Table) { } } + /** Converts the [[Table]] to a [[DataStream]] of the specified type. + * + * @param qConfig The configuration for the generated query. + */ + def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = { + table.tableEnv match { + case tEnv: ScalaStreamTableEnv => + tEnv.toDataStream(table, qConfig) + case _ => + throw new TableException( + "Only tables that originate from Scala DataStreams " + + "can be converted to Scala DataStreams.") + } + } + /** Converts the [[Table]] to a [[DataStream]] of add and retract messages. * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, * the second field holds the record of the specified type [[T]]. @@ -76,5 +91,28 @@ class TableConversions(table: Table) { } } + /** Converts the [[Table]] to a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + * @param qConfig The configuration for the generated query. + * + */ + def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { + + table.tableEnv match { + case tEnv: ScalaStreamTableEnv => + tEnv.toRetractStream(table, qConfig) + case _ => + throw new TableException( + "Only tables that originate from Scala DataStreams " + + "can be converted to Scala DataStreams.") + } + } + + + } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 310a75f..5a2eb1c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -763,6 +763,30 @@ class Table( * @tparam T The data type that the [[TableSink]] expects. */ def writeToSink[T](sink: TableSink[T]): Unit = { + + def qConfig = this.tableEnv match { + case s: StreamTableEnvironment => s.qConf + case b: BatchTableEnvironment => new BatchQueryConfig + case _ => null + } + + writeToSink(sink, qConfig) + } + + /** + * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. + * + * A batch [[Table]] can only be written to a + * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. + * + * @param sink The [[TableSink]] to which the [[Table]] is written. + * @param conf The configuration for the query that writes to the sink. + * @tparam T The data type that the [[TableSink]] expects. + */ + def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = { // get schema information of table val rowType = getRelNode.getRowType val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray @@ -773,7 +797,7 @@ class Table( val configuredSink = sink.configure(fieldNames, fieldTypes) // emit the table to the configured table sink - tableEnv.writeToSink(this, configuredSink) + tableEnv.writeToSink(this, configuredSink, conf) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index ce0f966..0e377b5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc @@ -83,11 +83,13 @@ class DataStreamCalc( estimateRowCount(calcProgram, rowCnt) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig - val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType val generator = new CodeGenerator(config, false, inputRowType) http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index 19ad89b..cbd818a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan @@ -82,12 +82,14 @@ class DataStreamCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig // we do not need to specify input type - val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo] val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 506c0cb..f01b24a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -21,9 +21,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.time.Time import org.apache.flink.api.java.functions.NullByteKeySelector import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.plan.nodes.CommonAggregate @@ -31,6 +32,7 @@ import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.slf4j.LoggerFactory /** * @@ -59,6 +61,8 @@ class DataStreamGroupAggregate( with CommonAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) + override def deriveRowType() = schema.logicalType override def needsUpdatesAsRetraction = true @@ -100,9 +104,18 @@ class DataStreamGroupAggregate( inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { + + if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) { + LOG.warn( + "No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent excessive " + + "state size. You may specify a retention time of 0 to not clean up the state.") + } - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( @@ -136,6 +149,7 @@ class DataStreamGroupAggregate( inputSchema.logicalType, inputSchema.physicalFieldTypeInfo, groupings, + qConfig, DataStreamRetractionRules.isAccRetract(this), DataStreamRetractionRules.isAccRetract(getInput)) http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index ef207b0..d2aaad0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ @@ -107,9 +107,11 @@ class DataStreamGroupWindowAggregate( namedProperties)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 4061242..8e97884 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.OverAggregate import org.apache.flink.table.plan.schema.RowSchema @@ -88,7 +88,10 @@ class DataStreamOverAggregate( namedAggregates)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { + if (logicWindow.groups.size > 1) { throw new TableException( "Unsupported use of OVER windows. All aggregates must be computed on the same window.") @@ -109,7 +112,7 @@ class DataStreamOverAggregate( "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") } - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 9754de4..6f6edf7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.plan.nodes.FlinkRelNode import org.apache.flink.table.runtime.types.CRow @@ -29,9 +29,12 @@ trait DataStreamRel extends FlinkRelNode { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. + * @param qConfig The configuration for the query to generate. * @return DataStream of type [[CRow]] */ - def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow] + def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] /** * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index c613646..e64bf0f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.schema.DataStreamTable import org.apache.flink.table.runtime.types.CRow @@ -54,7 +54,10 @@ class DataStreamScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { + val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream convertToInternalRow(schema, inputDataStream, dataStreamTable, config) http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index 654c259..6cc7396 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.types.CRow @@ -58,10 +58,12 @@ class DataStreamUnion( s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))" } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { - val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) + val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig) leftDataSet.union(rightDataSet) } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index 32c9aaf..ba6b025 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.io.CRowValuesInputFormat @@ -56,7 +56,9 @@ class DataStreamValues( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index b2d7019..225f23f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -22,7 +22,7 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema @@ -98,7 +98,10 @@ class StreamTableSourceScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream[CRow] = { + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] convertToInternalRow( http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 768c9cb..27392c7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{StreamQueryConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator @@ -155,6 +155,7 @@ object AggregateUtil { inputRowType: RelDataType, inputFieldTypes: Seq[TypeInformation[_]], groupings: Array[Int], + qConfig: StreamQueryConfig, generateRetraction: Boolean, consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = { @@ -190,7 +191,8 @@ object AggregateUtil { new GroupAggProcessFunction( genFunction, aggregationStateType, - generateRetraction) + generateRetraction, + qConfig) } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 6ee37e6..84fee87 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -26,9 +26,9 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState -import org.apache.flink.table.api.Types +import org.apache.flink.table.api.{StreamQueryConfig, Types} import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} -import org.slf4j.LoggerFactory +import org.slf4j.{Logger, LoggerFactory} import org.apache.flink.table.runtime.types.CRow /** @@ -40,13 +40,20 @@ import org.apache.flink.table.runtime.types.CRow class GroupAggProcessFunction( private val genAggregations: GeneratedAggregationsFunction, private val aggregationStateType: RowTypeInfo, - private val generateRetraction: Boolean) + private val generateRetraction: Boolean, + private val qConfig: StreamQueryConfig) extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { - val LOG = LoggerFactory.getLogger(this.getClass) + val LOG: Logger = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ + private val minRetentionTime = qConfig.getMinIdleStateRetentionTime + private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + private val cleanupTimerInterval = maxRetentionTime - minRetentionTime + private var newRow: CRow = _ private var prevRow: CRow = _ private var firstRow: Boolean = _ @@ -54,6 +61,8 @@ class GroupAggProcessFunction( private var state: ValueState[Row] = _ // counts the number of added and retracted input records private var cntState: ValueState[JLong] = _ + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ override def open(config: Configuration) { LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + @@ -74,6 +83,12 @@ class GroupAggProcessFunction( val inputCntDescriptor: ValueStateDescriptor[JLong] = new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG) cntState = getRuntimeContext.getState(inputCntDescriptor) + + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } } override def processElement( @@ -81,6 +96,23 @@ class GroupAggProcessFunction( ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { + if (stateCleaningEnabled) { + + val currentTime = ctx.timerService().currentProcessingTime() + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + val input = inputC.row // get accumulators and input counter @@ -144,4 +176,18 @@ class GroupAggProcessFunction( cntState.clear() } } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + + if (timestamp == cleanupTimeState.value()) { + // clear all state + this.state.clear() + this.cntState.clear() + this.cleanupTimeState.clear() + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 8626b07..3d79e22 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -18,16 +18,17 @@ package org.apache.flink.table.utils -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.tools.RuleSet -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} +import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource class MockTableEnvironment extends TableEnvironment(new TableConfig) { - override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ??? + override private[flink] def writeToSink[T]( + table: Table, + sink: TableSink[T], + qConfig: QueryConfig): Unit = ??? override protected def checkValidTableName(name: String): Unit = ???