Repository: flink
Updated Branches:
  refs/heads/master 704098725 -> b50ef4b8d


[FLINK-6491] [table] Add QueryConfig and state clean up for non-windowed 
aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16339db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16339db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16339db

Branch: refs/heads/master
Commit: d16339db83b98f41ef14fbd278530dc219f02ed8
Parents: 7040987
Author: Fabian Hueske <fhue...@apache.org>
Authored: Mon May 8 18:41:37 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 11 23:42:19 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  17 ++-
 .../apache/flink/table/api/QueryConfig.scala    | 102 ++++++++++++++++
 .../table/api/StreamTableEnvironment.scala      |  51 ++++++--
 .../flink/table/api/TableEnvironment.scala      |   2 +-
 .../table/api/java/StreamTableEnvironment.scala | 115 +++++++++++++++++--
 .../api/scala/StreamTableEnvironment.scala      |  46 +++++++-
 .../table/api/scala/TableConversions.scala      |  40 ++++++-
 .../org/apache/flink/table/api/table.scala      |  26 ++++-
 .../plan/nodes/datastream/DataStreamCalc.scala  |   8 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |   8 +-
 .../datastream/DataStreamGroupAggregate.scala   |  20 +++-
 .../DataStreamGroupWindowAggregate.scala        |   8 +-
 .../datastream/DataStreamOverAggregate.scala    |   9 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |   7 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   7 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |  10 +-
 .../nodes/datastream/DataStreamValues.scala     |   6 +-
 .../datastream/StreamTableSourceScan.scala      |   7 +-
 .../table/runtime/aggregate/AggregateUtil.scala |   6 +-
 .../aggregate/GroupAggProcessFunction.scala     |  54 ++++++++-
 .../table/utils/MockTableEnvironment.scala      |   9 +-
 21 files changed, 494 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 2a3cedf..f33c187 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, 
TimeAttribute}
+import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -113,9 +113,20 @@ abstract class BatchTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param qConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataSet]] which represents the 
[[Table]].
     */
-  override private[flink] def writeToSink[T](table: Table, sink: 
TableSink[T]): Unit = {
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = {
+
+    // We do not pass the configuration on, because there is nothing to 
configure for batch queries.
+    val bQConfig = qConfig match {
+      case batchConfig: BatchQueryConfig => batchConfig
+      case _ =>
+        throw new TableException("BatchQueryConfig required to configure batch 
query.")
+    }
 
     sink match {
       case batchSink: BatchTableSink[T] =>
@@ -125,7 +136,7 @@ abstract class BatchTableEnvironment(
         // Give the DataSet to the TableSink to emit it.
         batchSink.emitDataSet(result)
       case _ =>
-        throw new TableException("BatchTableSink required to emit batch Table")
+        throw new TableException("BatchTableSink required to emit batch 
Table.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
new file mode 100644
index 0000000..8e8b5ac
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.io.Serializable
+import org.apache.flink.api.common.time.Time
+
+class QueryConfig private[table] extends Serializable {}
+
+/**
+  * The [[BatchQueryConfig]] holds parameters to configure the behavior of 
batch queries.
+  */
+class BatchQueryConfig private[table] extends QueryConfig
+
+/**
+  * The [[StreamQueryConfig]] holds parameters to configure the behavior of 
streaming queries.
+  *
+  * An empty [[StreamQueryConfig]] can be generated using the 
[[StreamTableEnvironment.qConf]]
+  * method.
+  */
+class StreamQueryConfig private[table] extends QueryConfig {
+
+  /**
+    * The minimum time until state which was not updated will be retained.
+    * State might be cleared and removed if it was not updated for the defined 
period of time.
+    */
+  private var minIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * The maximum time until state which was not updated will be retained.
+    * State will be cleared and removed if it was not updated for the defined 
period of time.
+    */
+  private var maxIdleStateRetentionTime: Long = Long.MinValue
+
+  /**
+    * Specifies the time interval for how long idle state, i.e., state which 
was not updated, will
+    * be retained. When state was not updated for the specified interval of 
time, it will be cleared
+    * and removed.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will 
be handled as if it
+    * was the first data. This can result in previous results being 
overwritten.
+    *
+    * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows 
to set a minimum and
+    * maximum time for state to be retained. This method is more efficient, 
because the system has
+    * to do less bookkeeping to identify the time at which state must be 
cleared.
+    *
+    * @param time The time interval for how long idle state is retained. Set 
to 0 (zero) to never
+    *             clean-up the state.
+    */
+  def setIdleStateRetentionTime(time: Time): StreamQueryConfig = {
+    setIdleStateRetentionTime(time, time)
+  }
+
+  /**
+    * Specifies a minimum and a maximum time interval for how long idle state, 
i.e., state which
+    * was not updated, will be retained.
+    * State will never be cleared until it was idle for less than the minimum 
time and will never
+    * be kept if it was idle for more than the maximum time.
+    *
+    * When new data arrives for previously cleaned-up state, the new data will 
be handled as if it
+    * was the first data. This can result in previous results being 
overwritten.
+    *
+    * Set to 0 (zero) to never clean-up the state.
+    *
+    * @param minTime The minimum time interval for which idle state is 
retained. Set to 0 (zero) to
+    *                never clean-up the state.
+    * @param maxTime The maximum time interval for which idle state is 
retained. May not be smaller
+    *                than than minTime. Set to 0 (zero) to never clean-up the 
state.
+    */
+  def setIdleStateRetentionTime(minTime: Time, maxTime: Time): 
StreamQueryConfig = {
+    if (maxTime.toMilliseconds < minTime.toMilliseconds) {
+      throw new IllegalArgumentException("maxTime may not be smaller than 
minTime.")
+    }
+    minIdleStateRetentionTime = minTime.toMilliseconds
+    maxIdleStateRetentionTime = maxTime.toMilliseconds
+    this
+  }
+
+  def getMinIdleStateRetentionTime: Long = {
+    minIdleStateRetentionTime
+  }
+
+  def getMaxIdleStateRetentionTime: Long = {
+    maxIdleStateRetentionTime
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index aef2b1b..c594d4c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
+  def qConf: StreamQueryConfig = new StreamQueryConfig
+
   /**
     * Checks if the chosen table name is valid.
     *
@@ -126,9 +128,20 @@ abstract class StreamTableEnvironment(
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param qConfig The configuration for the query to generate.
     * @tparam T The expected type of the [[DataStream]] which represents the 
[[Table]].
     */
-  override private[flink] def writeToSink[T](table: Table, sink: 
TableSink[T]): Unit = {
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = {
+
+    // Check query configuration
+    val sQConf = qConfig match {
+      case streamConfig: StreamQueryConfig => streamConfig
+      case _ =>
+        throw new TableException("StreamQueryConfig required to configure 
stream query.")
+    }
 
     sink match {
 
@@ -137,7 +150,7 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the 
TableSink expects.
         val result: DataStream[T] =
-          translate(table, updatesAsRetraction = true, withChangeFlag = 
true)(outputType)
+          translate(table, sQConf, updatesAsRetraction = true, withChangeFlag 
= true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         retractSink.asInstanceOf[RetractStreamTableSink[Any]]
           .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -160,7 +173,11 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the 
TableSink expects.
         val result: DataStream[T] =
-          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag 
= true)(outputType)
+          translate(
+            optimizedPlan,
+            table.getRelNode.getRowType,
+            sQConf,
+            withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
           .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -176,7 +193,11 @@ abstract class StreamTableEnvironment(
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the 
TableSink expects.
         val result: DataStream[T] =
-          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag 
= false)(outputType)
+          translate(
+            optimizedPlan,
+            table.getRelNode.getRowType,
+            sQConf,
+            withChangeFlag = false)(outputType)
         // Give the DataStream to the TableSink to emit it.
         
appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
 
@@ -545,17 +566,21 @@ abstract class StreamTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding 
[[DataStream]] operators.
     *
     * @param table The root node of the relational expression tree.
+    * @param qConfig The configuration for the query to generate.
     * @param updatesAsRetraction Set to true to encode updates as retraction 
messages.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](table: Table, updatesAsRetraction: Boolean, 
withChangeFlag: Boolean)
-      (implicit tpe: TypeInformation[A]): DataStream[A] = {
+  protected def translate[A](
+      table: Table,
+      qConfig: StreamQueryConfig,
+      updatesAsRetraction: Boolean,
+      withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
-    translate(dataStreamPlan, relNode.getRowType, withChangeFlag)
+    translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag)
   }
 
   /**
@@ -564,6 +589,7 @@ abstract class StreamTableEnvironment(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can 
lose the
     *                    field naming during optimization we pass the row type 
separately.
+    * @param qConfig     The configuration for the query to generate.
     * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe         The [[TypeInformation]] of the resulting 
[[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
@@ -572,6 +598,7 @@ abstract class StreamTableEnvironment(
   protected def translate[A](
       logicalPlan: RelNode,
       logicalType: RelDataType,
+      qConfig: StreamQueryConfig,
       withChangeFlag: Boolean)
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
@@ -583,7 +610,7 @@ abstract class StreamTableEnvironment(
     }
 
     // get CRow plan
-    val plan: DataStream[CRow] = translateToCRow(logicalPlan)
+    val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig)
 
     // convert CRow to output type
     val conversion = if (withChangeFlag) {
@@ -615,14 +642,16 @@ abstract class StreamTableEnvironment(
     * Translates a logical [[RelNode]] plan into a [[DataStream]] of type 
[[CRow]].
     *
     * @param logicalPlan The logical plan to translate.
+    * @param qConfig The configuration for the query to generate.
     * @return The [[DataStream]] of type [[CRow]].
     */
   protected def translateToCRow(
-    logicalPlan: RelNode): DataStream[CRow] = {
+    logicalPlan: RelNode,
+    qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     logicalPlan match {
       case node: DataStreamRel =>
-        node.translateToPlan(this)
+        node.translateToPlan(this, qConfig)
       case _ =>
         throw TableException("Cannot generate DataStream due to an invalid 
logical plan. " +
           "This is a bug and should not happen. Please file an issue.")
@@ -638,7 +667,7 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast, updatesAsRetraction = false)
-    val dataStream = translateToCRow(optimizedPlan)
+    val dataStream = translateToCRow(optimizedPlan, qConf)
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bf4a8e0..9f50f0c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -510,7 +510,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param sink The [[TableSink]] to write the [[Table]] to.
     * @tparam T The data type that the [[TableSink]] expects.
     */
-  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
+  private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: 
QueryConfig): Unit
 
   /**
     * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index a70bcca..c3b5951 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -150,9 +150,50 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    toDataStream(table, clazz, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the 
[[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] = {
+    toDataStream(table, typeInfo, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, clazz: Class[T], qConfig: 
StreamQueryConfig): DataStream[T] = {
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, updatesAsRetraction = false, withChangeFlag = 
false)(typeInfo)
+    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = 
false)(typeInfo)
   }
 
   /**
@@ -168,12 +209,64 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] that specifies the type of the 
[[DataStream]].
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the resulting [[DataStream]].
     * @return The converted [[DataStream]].
     */
-  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): 
DataStream[T] = {
+  def toDataStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      qConfig: StreamQueryConfig): DataStream[T] = {
     TableEnvironment.validateType(typeInfo)
-    translate[T](table, updatesAsRetraction = false, withChangeFlag = 
false)(typeInfo)
+    translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = 
false)(typeInfo)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract 
messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a 
[[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a 
retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](
+      table: Table,
+      clazz: Class[T]): DataStream[JTuple2[JBool, T]] = {
+
+    toRetractStream(table, clazz, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract 
messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a 
[[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a 
retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = {
+
+    toRetractStream(table, typeInfo, qConf)
   }
 
   /**
@@ -190,17 +283,21 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param clazz The class of the requested record type.
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
-  def toRetractStream[T](table: Table, clazz: Class[T]):
-    DataStream[JTuple2[JBool, T]] = {
+  def toRetractStream[T](
+      table: Table,
+      clazz: Class[T],
+      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     val typeInfo = TypeExtractor.createTypeInfo(clazz)
     TableEnvironment.validateType(typeInfo)
     val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, 
typeInfo)
     translate[JTuple2[JBool, T]](
       table,
+      qConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultType)
   }
@@ -219,11 +316,14 @@ class StreamTableEnvironment(
     *
     * @param table The [[Table]] to convert.
     * @param typeInfo The [[TypeInformation]] of the requested record type.
+    * @param qConfig The configuration of the query to generate.
     * @tparam T The type of the requested record type.
     * @return The converted [[DataStream]].
     */
-  def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]):
-    DataStream[JTuple2[JBool, T]] = {
+  def toRetractStream[T](
+      table: Table,
+      typeInfo: TypeInformation[T],
+      qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
 
     TableEnvironment.validateType(typeInfo)
     val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
@@ -232,6 +332,7 @@ class StreamTableEnvironment(
     )
     translate[JTuple2[JBool, T]](
       table,
+      qConfig,
       updatesAsRetraction = true,
       withChangeFlag = true)(resultTypeInfo)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index e5ad6c2..56f7d55 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableConfig, 
TableEnvironment}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
@@ -143,8 +143,29 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+    toDataStream(table, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T: TypeInformation](table: Table, qConfig: 
StreamQueryConfig): DataStream[T] = {
     val returnType = createTypeInformation[T]
-    asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag 
= false)(returnType))
+    asScalaStream(
+      translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = 
false)(returnType))
   }
 
 /**
@@ -159,8 +180,27 @@ class StreamTableEnvironment(
   * @return The converted [[DataStream]].
   */
   def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, 
T)] = {
+    toRetractStream(table, qConf)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract 
messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates 
a retract message.
+    *
+    * @param table The [[Table]] to convert.
+    * @param qConfig The configuration of the query to generate.
+    * @tparam T The type of the requested data type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T: TypeInformation](
+      table: Table,
+      qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
     val returnType = createTypeInformation[(Boolean, T)]
-    asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag 
= true)(returnType))
+    asScalaStream(
+      translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = 
true)(returnType))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 5efff62..966b42f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.{Table, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv}
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => 
ScalaStreamTableEnv}
 
@@ -57,6 +57,21 @@ class TableConversions(table: Table) {
     }
   }
 
+  /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+    *
+    * @param qConfig The configuration for the generated query.
+    */
+  def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): 
DataStream[T] = {
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toDataStream(table, qConfig)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
   /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
     * The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag,
     * the second field holds the record of the specified type [[T]].
@@ -76,5 +91,28 @@ class TableConversions(table: Table) {
     }
   }
 
+  /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates 
a retract message.
+    *
+    * @param qConfig The configuration for the generated query.
+    *
+    */
+  def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): 
DataStream[(Boolean, T)] = {
+
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toRetractStream(table, qConfig)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 310a75f..5a2eb1c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -763,6 +763,30 @@ class Table(
     * @tparam T The data type that the [[TableSink]] expects.
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
+
+    def qConfig = this.tableEnv match {
+      case s: StreamTableEnvironment => s.qConf
+      case b: BatchTableEnvironment => new BatchQueryConfig
+      case _ => null
+    }
+
+    writeToSink(sink, qConfig)
+  }
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an 
external storage location.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] 
requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param sink The [[TableSink]] to which the [[Table]] is written.
+    * @param conf The configuration for the query that writes to the sink.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = {
     // get schema information of table
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
@@ -773,7 +797,7 @@ class Table(
     val configuredSink = sink.configure(fieldNames, fieldTypes)
 
     // emit the table to the configured table sink
-    tableEnv.writeToSink(this, configuredSink)
+    tableEnv.writeToSink(this, configuredSink, conf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index ce0f966..0e377b5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
@@ -83,11 +83,13 @@ class DataStreamCalc(
     estimateRowCount(calcProgram, rowCnt)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
-    val inputDataStream = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDataStream = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     val inputRowType = 
inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
 
     val generator = new CodeGenerator(config, false, inputRowType)

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 19ad89b..cbd818a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
@@ -82,12 +82,14 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
-    val inputDS = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 506c0cb..f01b24a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -21,9 +21,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -31,6 +32,7 @@ import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.slf4j.LoggerFactory
 
 /**
   *
@@ -59,6 +61,8 @@ class DataStreamGroupAggregate(
     with CommonAggregate
     with DataStreamRel {
 
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
   override def deriveRowType() = schema.logicalType
 
   override def needsUpdatesAsRetraction = true
@@ -100,9 +104,18 @@ class DataStreamGroupAggregate(
         inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    if (qConfig.getMinIdleStateRetentionTime < 0 || 
qConfig.getMaxIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+        "Please provide a query configuration with valid retention interval to 
prevent excessive " +
+          "state size. You may specify a retention time of 0 to not clean up 
the state.")
+    }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
qConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](
@@ -136,6 +149,7 @@ class DataStreamGroupAggregate(
       inputSchema.logicalType,
       inputSchema.physicalFieldTypeInfo,
       groupings,
+      qConfig,
       DataStreamRetractionRules.isAccRetract(this),
       DataStreamRetractionRules.isAccRetract(getInput))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index ef207b0..d2aaad0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
@@ -107,9 +107,11 @@ class DataStreamGroupWindowAggregate(
           namedProperties))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
qConfig)
 
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 4061242..8e97884 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.plan.schema.RowSchema
@@ -88,7 +88,10 @@ class DataStreamOverAggregate(
           namedAggregates))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     if (logicWindow.groups.size > 1) {
       throw new TableException(
         "Unsupported use of OVER windows. All aggregates must be computed on 
the same window.")
@@ -109,7 +112,7 @@ class DataStreamOverAggregate(
         "Unsupported use of OVER windows. The window can only be ordered in 
ASCENDING mode.")
     }
 
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
qConfig)
 
     val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 9754de4..6f6edf7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.nodes.FlinkRelNode
 import org.apache.flink.table.runtime.types.CRow
 
@@ -29,9 +29,12 @@ trait DataStreamRel extends FlinkRelNode {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
+    * @param qConfig The configuration for the query to generate.
     * @return DataStream of type [[CRow]]
     */
-  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
+  def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    qConfig: StreamQueryConfig): DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes 
are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c613646..e64bf0f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.schema.DataStreamTable
 import org.apache.flink.table.runtime.types.CRow
@@ -54,7 +54,10 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     convertToInternalRow(schema, inputDataStream, dataStreamTable, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 654c259..6cc7396 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 
@@ -58,10 +58,12 @@ class DataStreamUnion(
     s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val leftDataSet = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-    val rightDataSet = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val leftDataSet = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+    val rightDataSet = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
     leftDataSet.union(rightDataSet)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 32c9aaf..ba6b025 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.io.CRowValuesInputFormat
@@ -56,7 +56,9 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index b2d7019..225f23f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
@@ -98,7 +98,10 @@ class StreamTableSourceScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      qConfig: StreamQueryConfig): DataStream[CRow] = {
+
     val config = tableEnv.getConfig
     val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
     convertToInternalRow(

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 768c9cb..27392c7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
@@ -155,6 +155,7 @@ object AggregateUtil {
       inputRowType: RelDataType,
       inputFieldTypes: Seq[TypeInformation[_]],
       groupings: Array[Int],
+      qConfig: StreamQueryConfig,
       generateRetraction: Boolean,
       consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
@@ -190,7 +191,8 @@ object AggregateUtil {
     new GroupAggProcessFunction(
       genFunction,
       aggregationStateType,
-      generateRetraction)
+      generateRetraction,
+      qConfig)
 
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 6ee37e6..84fee87 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,9 +26,9 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 import org.apache.flink.table.runtime.types.CRow
 
 /**
@@ -40,13 +40,20 @@ import org.apache.flink.table.runtime.types.CRow
 class GroupAggProcessFunction(
     private val genAggregations: GeneratedAggregationsFunction,
     private val aggregationStateType: RowTypeInfo,
-    private val generateRetraction: Boolean)
+    private val generateRetraction: Boolean,
+    private val qConfig: StreamQueryConfig)
   extends ProcessFunction[CRow, CRow]
     with Compiler[GeneratedAggregations] {
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
+  private val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+  private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+  private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime 
> 1
+  // interval in which clean-up timers are registered
+  private val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
   private var newRow: CRow = _
   private var prevRow: CRow = _
   private var firstRow: Boolean = _
@@ -54,6 +61,8 @@ class GroupAggProcessFunction(
   private var state: ValueState[Row] = _
   // counts the number of added and retracted input records
   private var cntState: ValueState[JLong] = _
+  // holds the latest registered cleanup timer
+  private var cleanupTimeState: ValueState[JLong] = _
 
   override def open(config: Configuration) {
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -74,6 +83,12 @@ class GroupAggProcessFunction(
     val inputCntDescriptor: ValueStateDescriptor[JLong] =
       new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
     cntState = getRuntimeContext.getState(inputCntDescriptor)
+
+    if (stateCleaningEnabled) {
+      val inputCntDescriptor: ValueStateDescriptor[JLong] =
+        new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", 
Types.LONG)
+      cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+    }
   }
 
   override def processElement(
@@ -81,6 +96,23 @@ class GroupAggProcessFunction(
       ctx: ProcessFunction[CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
+    if (stateCleaningEnabled) {
+
+      val currentTime = ctx.timerService().currentProcessingTime()
+      val earliestCleanup = currentTime + minRetentionTime
+
+      // last registered timer
+      val lastCleanupTime = cleanupTimeState.value()
+
+      if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + 
cleanupTimerInterval) {
+        // we need to register a new timer
+        val cleanupTime = earliestCleanup + cleanupTimerInterval
+        // register timer and remember clean-up time
+        ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+        cleanupTimeState.update(cleanupTime)
+      }
+    }
+
     val input = inputC.row
 
     // get accumulators and input counter
@@ -144,4 +176,18 @@ class GroupAggProcessFunction(
       cntState.clear()
     }
   }
+
+  override def onTimer(
+      timestamp: Long,
+      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      out: Collector[CRow]): Unit = {
+
+    if (timestamp == cleanupTimeState.value()) {
+      // clear all state
+      this.state.clear()
+      this.cntState.clear()
+      this.cleanupTimeState.clear()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d16339db/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8626b07..3d79e22 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -18,16 +18,17 @@
 
 package org.apache.flink.table.utils
 
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.tools.RuleSet
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, 
TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 
 class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
-  override private[flink] def writeToSink[T](table: Table, sink: 
TableSink[T]): Unit = ???
+  override private[flink] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      qConfig: QueryConfig): Unit = ???
 
   override protected def checkValidTableName(name: String): Unit = ???
 

Reply via email to