[FLINK-6091] [table] Implement and turn on retraction for non-windowed 
aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27bf4cab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27bf4cab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27bf4cab

Branch: refs/heads/master
Commit: 27bf4cab7141ccbc7e8effe03559c50bbb3f9707
Parents: dc54abc
Author: Hequn Cheng <chenghe...@gmail.com>
Authored: Tue Apr 18 16:54:09 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Sat May 6 01:51:55 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  63 +++++-
 .../table/api/StreamTableEnvironment.scala      |  64 +++++-
 .../flink/table/api/TableEnvironment.scala      |  65 +++---
 .../flink/table/plan/nodes/CommonCalc.scala     |  32 ++-
 .../table/plan/nodes/CommonCorrelate.scala      | 139 +++++--------
 .../flink/table/plan/nodes/CommonScan.scala     |   7 +-
 .../table/plan/nodes/dataset/BatchScan.scala    |   2 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  16 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  29 ++-
 .../plan/nodes/dataset/DataSetValues.scala      |   2 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  26 +--
 .../nodes/datastream/DataStreamCorrelate.scala  |  40 +++-
 .../datastream/DataStreamGroupAggregate.scala   |  24 ++-
 .../DataStreamGroupWindowAggregate.scala        |  52 +++--
 .../datastream/DataStreamOverAggregate.scala    |  72 ++++---
 .../plan/nodes/datastream/DataStreamRel.scala   |   6 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   7 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   4 +-
 .../nodes/datastream/DataStreamValues.scala     |  13 +-
 .../plan/nodes/datastream/StreamScan.scala      |  27 ++-
 .../datastream/StreamTableSourceScan.scala      |   8 +-
 .../plan/nodes/logical/FlinkLogicalCalc.scala   |   2 +-
 .../datastream/DataStreamRetractionRules.scala  |  16 +-
 .../runtime/CRowCorrelateFlatMapRunner.scala    |  83 ++++++++
 .../flink/table/runtime/CRowFlatMapRunner.scala |  72 +++++++
 .../table/runtime/CRowInputMapRunner.scala      |  57 ++++++
 .../table/runtime/CRowOutputMapRunner.scala     |  60 ++++++
 .../table/runtime/CRowWrappingCollector.scala   |  41 ++++
 .../flink/table/runtime/FlatMapRunner.scala     |  17 +-
 .../aggregate/AggregateAggFunction.scala        |  15 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  48 +++--
 ...SetSessionWindowAggReduceGroupFunction.scala |   4 +-
 ...taSetSlideWindowAggReduceGroupFunction.scala |   4 +-
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   4 +-
 .../aggregate/GroupAggProcessFunction.scala     |  58 ++++--
 ...rementalAggregateAllTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateAllWindowFunction.scala |  11 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateWindowFunction.scala    |  13 +-
 .../aggregate/ProcTimeBoundedRangeOver.scala    |  30 +--
 .../aggregate/ProcTimeBoundedRowsOver.scala     |  26 ++-
 .../ProcTimeUnboundedNonPartitionedOver.scala   |  23 ++-
 .../ProcTimeUnboundedPartitionedOver.scala      |  22 +-
 .../aggregate/RowTimeBoundedRangeOver.scala     |  36 ++--
 .../aggregate/RowTimeBoundedRowsOver.scala      |  30 +--
 .../aggregate/RowTimeUnboundedOver.scala        |  50 ++---
 .../aggregate/TimeWindowPropertyCollector.scala |  34 ++-
 .../runtime/io/CRowValuesInputFormat.scala      |  59 ++++++
 .../table/runtime/io/ValuesInputFormat.scala    |  17 +-
 .../apache/flink/table/runtime/types/CRow.scala |  55 +++++
 .../table/runtime/types/CRowComparator.scala    |  83 ++++++++
 .../table/runtime/types/CRowSerializer.scala    |  78 +++++++
 .../table/runtime/types/CRowTypeInfo.scala      |  98 +++++++++
 .../apache/flink/table/sinks/CsvTableSink.scala |   2 +
 .../flink/table/TableEnvironmentTest.scala      |  72 ++++++-
 .../scala/batch/TableEnvironmentITCase.scala    |  20 ++
 .../api/scala/stream/RetractionITCase.scala     | 205 +++++++++++++++++++
 .../api/scala/stream/TableSinkITCase.scala      |   2 +-
 .../api/scala/stream/utils/StreamITCase.scala   |  11 +-
 ...ProcessingOverRangeProcessFunctionTest.scala | 105 +++++-----
 .../runtime/types/CRowComparatorTest.scala      |  61 ++++++
 .../table/utils/MockTableEnvironment.scala      |   8 +
 62 files changed, 1813 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 02c6063..f7955f0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,16 +26,20 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, 
TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -128,6 +132,56 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Creates a final converter that maps the internal row type to external 
type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has 
to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // Row to Row, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // Row to CRow, only needs to be wrapped
+      Some(
+        new RichMapFunction[Row, CRow] {
+          private var outCRow: CRow = _
+          override def open(parameters: Configuration): Unit = outCRow = new 
CRow(null, true)
+          override def map(value: Row): CRow = {
+            outCRow.row = value
+            outCRow
+          }
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // some type that is neither Row or CRow
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      val mapFunction = new MapRunner[IN, OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+
+      Some(mapFunction)
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *
@@ -293,10 +347,15 @@ abstract class BatchTableEnvironment(
     logicalPlan match {
       case node: DataSetRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataSetSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, 
"DataSetSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[Row, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataSet[A]]
         }
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index dd2c09d..0632a47 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -37,7 +38,9 @@ import org.apache.flink.table.expressions.{Expression, 
ProctimeAttribute, Rowtim
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataStreamTable, 
StreamTableSourceTable}
+import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
@@ -132,6 +135,52 @@ abstract class StreamTableEnvironment(
     }
   }
 
+
+  /**
+    * Creates a final converter that maps the internal row type to external 
type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has 
to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+  Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // CRow to CRow, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // CRow to Row, only needs to be unwrapped
+      Some(
+        new MapFunction[CRow, Row] {
+          override def map(value: CRow): Row = value.row
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // Some type that is neither CRow nor Row
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      Some(new CRowInputMapRunner[OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+        .asInstanceOf[MapFunction[IN, OUT]])
+
+    }
+  }
+
   /**
     * Registers a [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
     * catalog.
@@ -377,10 +426,15 @@ abstract class StreamTableEnvironment(
     logicalPlan match {
       case node: DataStreamRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataStreamSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, 
"DataStreamSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataStream[A]] // no conversion 
necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[CRow, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataStream[A]]
         }
 
       case _ =>
@@ -398,9 +452,9 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataStream = translate[Row](
+    val dataStream = translate[CRow](
       optimizedPlan,
-      ast.getRowType)(new GenericTypeInfo(classOf[Row]))
+      ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 9ed5000..d27db1e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment 
=> JavaBatchTableE
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, 
GeneratedFunction}
 import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
AggregateFunction}
@@ -59,7 +59,7 @@ import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
@@ -644,6 +644,18 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
         }
+      case cr: CRowTypeInfo =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = cr.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $cr")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression 
expected.")
+        }
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
@@ -694,37 +706,38 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Creates a final converter that maps the internal row type to external 
type.
     *
-    * @param physicalRowTypeInfo the input of the sink
+    * @param physicalTypeInfo the input of the sink
     * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has 
to be a
     *                     valid Java class identifier.
     */
-  protected def sinkConversion[T](
-      physicalRowTypeInfo: TypeInformation[Row],
+  protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]]
+
+  protected def generateRowConverterFunction[OUT](
+      inputTypeInfo: TypeInformation[Row],
       logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[T],
-      functionName: String)
-    : Option[MapFunction[Row, T]] = {
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    GeneratedFunction[MapFunction[Row, OUT], OUT] = {
 
     // validate that at least the field types of physical and logical type 
match
     // we do that here to make sure that plan translation was correct
     val logicalRowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
-    if (physicalRowTypeInfo != logicalRowTypeInfo) {
+    if (logicalRowTypeInfo != inputTypeInfo) {
       throw TableException("The field types of physical and logical row types 
do not match." +
         "This is a bug and should not happen. Please file an issue.")
     }
 
-    // requested type is a generic Row, no conversion needed
-    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
-          requestedTypeInfo.getTypeClass == classOf[Row]) {
-      return None
-    }
-
     // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { 
relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala
+      .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
+
     // field names
     val logicalFieldNames = logicalRowType.getFieldNames.asScala
 
@@ -732,8 +745,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
       throw new TableException("Arity of result does not match requested 
type.")
     }
-    requestedTypeInfo match {
 
+    requestedTypeInfo match {
       // POJO type requested
       case pt: PojoTypeInfo[_] =>
         logicalFieldNames.zip(logicalFieldTypes) foreach {
@@ -780,7 +793,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     val generator = new CodeGenerator(
       config,
       false,
-      physicalRowTypeInfo,
+      inputTypeInfo,
       None,
       None)
 
@@ -794,20 +807,12 @@ abstract class TableEnvironment(val config: TableConfig) {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       functionName,
-      classOf[MapFunction[Row, T]],
+      classOf[MapFunction[Row, OUT]],
       body,
       requestedTypeInfo)
-
-    val mapFunction = new MapRunner[Row, T](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-    Some(mapFunction)
   }
-
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index ff5ffb2..e875587 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -20,26 +20,26 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
 import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait CommonCalc {
+trait CommonCalc[T] {
 
-  private[flink] def functionBody(
+  private[flink] def generateFunction(
       generator: CodeGenerator,
+      ruleDescription: String,
       inputSchema: RowSchema,
       returnSchema: RowSchema,
       calcProgram: RexProgram,
-      config: TableConfig)
-    : String = {
+      config: TableConfig):
+    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val expandedExpressions = calcProgram
       .getProjectList
@@ -61,7 +61,7 @@ trait CommonCalc {
       expandedExpressions)
 
     // only projection
-    if (condition == null) {
+    val body = if (condition == null) {
       s"""
         |${projection.code}
         |${generator.collectorTerm}.collect(${projection.resultTerm});
@@ -89,16 +89,12 @@ trait CommonCalc {
           |""".stripMargin
       }
     }
-  }
-
-  private[flink] def calcMapFunction(
-      genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
-    : RichFlatMapFunction[Row, Row] = {
 
-    new FlatMapRunner[Row, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
+    generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]],
+      body,
+      returnSchema.physicalTypeInfo)
   }
 
   private[flink] def conditionToString(
@@ -168,8 +164,8 @@ trait CommonCalc {
     // CASTs in RexProgram are reduced as far as possible by 
ReduceExpressionsRule
     // in normalization stage. So we should ignore CASTs here in optimization 
stage.
     val compCnt = calcProgram.getExprList.asScala.toList.count {
-      case i: RexInputRef => false
-      case l: RexLiteral => false
+      case _: RexInputRef => false
+      case _: RexLiteral => false
       case c: RexCall if c.getOperator.getName.equals("CAST") => false
       case _ => true
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 02305ee..44a109e3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -28,7 +28,7 @@ import 
org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, 
GeneratedExpression, GeneratedFunction}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, 
TableFunctionCollector}
+import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -36,72 +36,27 @@ import scala.collection.JavaConverters._
 /**
   * Join a user-defined table function
   */
-trait CommonCorrelate {
-
-  /**
-    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
-    * and user-defined table function.
-    */
-  private[flink] def correlateMapFunction(
-      config: TableConfig,
-      inputSchema: RowSchema,
-      udtfTypeInfo: TypeInformation[Any],
-      returnSchema: RowSchema,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field 
mapping
-      ruleDescription: String)
-    : CorrelateFlatMapRunner[Row, Row] = {
-
-    val flatMap = generateFunction(
-      config,
-      inputSchema.physicalTypeInfo,
-      udtfTypeInfo,
-      returnSchema.physicalTypeInfo,
-      returnSchema.logicalFieldNames,
-      joinType,
-      inputSchema.mapRexNode(rexCall).asInstanceOf[RexCall],
-      pojoFieldMapping,
-      ruleDescription)
-
-    val collector = generateCollector(
-      config,
-      inputSchema.physicalTypeInfo,
-      udtfTypeInfo,
-      returnSchema.physicalTypeInfo,
-      returnSchema.logicalFieldNames,
-      condition.map(inputSchema.mapRexNode),
-      pojoFieldMapping)
-
-    new CorrelateFlatMapRunner[Row, Row](
-      flatMap.name,
-      flatMap.code,
-      collector.name,
-      collector.code,
-      flatMap.returnType)
-
-  }
+trait CommonCorrelate[T] {
 
   /**
     * Generates the flat map function to run the user-defined table function.
     */
-  private def generateFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Row],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Row],
-      resultFieldNames: Seq[String],
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      pojoFieldMapping: Option[Array[Int]],
-      ruleDescription: String)
-    : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+  private[flink] def generateFunction(
+    config: TableConfig,
+    inputSchema: RowSchema,
+    udtfTypeInfo: TypeInformation[Any],
+    returnSchema: RowSchema,
+    rowType: RelDataType,
+    joinType: SemiJoinType,
+    rexCall: RexCall,
+    pojoFieldMapping: Option[Array[Int]],
+    ruleDescription: String):
+  GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val functionGenerator = new CodeGenerator(
       config,
       false,
-      inputTypeInfo,
+      inputSchema.physicalTypeInfo,
       Some(udtfTypeInfo),
       None,
       pojoFieldMapping)
@@ -115,9 +70,9 @@ trait CommonCorrelate {
     val call = functionGenerator.generateExpression(rexCall)
     var body =
       s"""
-        |${call.resultTerm}.setCollector($collectorTerm);
-        |${call.code}
-        |""".stripMargin
+         |${call.resultTerm}.setCollector($collectorTerm);
+         |${call.code}
+         |""".stripMargin
 
     if (joinType == SemiJoinType.LEFT) {
       // left outer join
@@ -132,15 +87,17 @@ trait CommonCorrelate {
           x.resultType)
       }
       val outerResultExpr = functionGenerator.generateResultExpression(
-        input1AccessExprs ++ input2NullExprs, returnType, resultFieldNames)
+        input1AccessExprs ++ input2NullExprs,
+        returnSchema.physicalTypeInfo,
+        rowType.getFieldNames.asScala)
       body +=
         s"""
-          |boolean hasOutput = $collectorTerm.isCollected();
-          |if (!hasOutput) {
-          |  ${outerResultExpr.code}
-          |  
${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
-          |}
-          |""".stripMargin
+           |boolean hasOutput = $collectorTerm.isCollected();
+           |if (!hasOutput) {
+           |  ${outerResultExpr.code}
+           |  
${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+           |}
+           |""".stripMargin
     } else if (joinType != SemiJoinType.INNER) {
       throw TableException(s"Unsupported SemiJoinType: $joinType for correlate 
join.")
     }
@@ -149,26 +106,26 @@ trait CommonCorrelate {
       ruleDescription,
       classOf[FlatMapFunction[Row, Row]],
       body,
-      returnType)
+      returnSchema.physicalTypeInfo)
   }
 
   /**
     * Generates table function collector.
     */
   private[flink] def generateCollector(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Row],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Row],
-      resultFieldNames: Seq[String],
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedCollector = {
+    config: TableConfig,
+    inputSchema: RowSchema,
+    udtfTypeInfo: TypeInformation[Any],
+    returnSchema: RowSchema,
+    rowType: RelDataType,
+    condition: Option[RexNode],
+    pojoFieldMapping: Option[Array[Int]])
+  : GeneratedCollector = {
 
     val generator = new CodeGenerator(
       config,
       false,
-      inputTypeInfo,
+      inputSchema.physicalTypeInfo,
       Some(udtfTypeInfo),
       None,
       pojoFieldMapping)
@@ -177,26 +134,26 @@ trait CommonCorrelate {
 
     val crossResultExpr = generator.generateResultExpression(
       input1AccessExprs ++ input2AccessExprs,
-      returnType,
-      resultFieldNames)
+      returnSchema.physicalTypeInfo,
+      rowType.getFieldNames.asScala)
 
     val collectorCode = if (condition.isEmpty) {
       s"""
-        |${crossResultExpr.code}
-        |getCollector().collect(${crossResultExpr.resultTerm});
-        |""".stripMargin
+         |${crossResultExpr.code}
+         |getCollector().collect(${crossResultExpr.resultTerm});
+         |""".stripMargin
     } else {
       val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, 
None, pojoFieldMapping)
       filterGenerator.input1Term = filterGenerator.input2Term
       val filterCondition = filterGenerator.generateExpression(condition.get)
       s"""
-        |${filterGenerator.reuseInputUnboxingCode()}
-        |${filterCondition.code}
-        |if (${filterCondition.resultTerm}) {
-        |  ${crossResultExpr.code}
-        |  getCollector().collect(${crossResultExpr.resultTerm});
-        |}
-        |""".stripMargin
+         |${filterGenerator.reuseInputUnboxingCode()}
+         |${filterCondition.code}
+         |if (${filterCondition.resultTerm}) {
+         |  ${crossResultExpr.code}
+         |  getCollector().collect(${crossResultExpr.resultTerm});
+         |}
+         |""".stripMargin
     }
 
     generator.generateTableFunctionCollector(

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 091a1ea..7ce73ee 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan {
+trait CommonScan[T] {
 
   /**
     * We check if the input type is exactly the same as the internal row type.
@@ -35,11 +35,8 @@ trait CommonScan {
     */
   private[flink] def needsConversion(
       externalTypeInfo: TypeInformation[Any],
-      internalTypeInfo: TypeInformation[Row])
-    : Boolean = {
-
+      internalTypeInfo: TypeInformation[T]): Boolean =
     externalTypeInfo != internalTypeInfo
-  }
 
   private[flink] def generatedConversionFunction[F <: Function](
       config: TableConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index cc5d9fb..95707b8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -30,7 +30,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait BatchScan extends CommonScan with DataSetRel {
+trait BatchScan extends CommonScan[Row] with DataSetRel {
 
   protected def convertToInternalRow(
       input: DataSet[Any],

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 5274fa1..e340a8c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,7 +24,6 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
@@ -47,7 +46,7 @@ class DataSetCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[Row]
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -88,22 +87,17 @@ class DataSetCalc(
 
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
-    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnType = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
+      ruleDescription,
       new RowSchema(getInput.getRowType),
       new RowSchema(getRowType),
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      rowTypeInfo)
-
-    val runner = calcMapFunction(genFunction)
+    val runner = new FlatMapRunner(genFunction.name, genFunction.code, 
returnType)
 
     inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 6c79b45..49ead26 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CorrelateFlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -48,7 +49,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with CommonCorrelate
+  with CommonCorrelate[Row]
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -98,22 +99,38 @@ class DataSetCorrelate(
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnType = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
-    val mapFunc = correlateMapFunction(
+    val flatMap = generateFunction(
       config,
       new RowSchema(getInput.getRowType),
       udtfTypeInfo,
       new RowSchema(getRowType),
+      rowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
+    val collector = generateCollector(
+      config,
+      new RowSchema(getInput.getRowType),
+      udtfTypeInfo,
+      new RowSchema(getRowType),
+      rowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CorrelateFlatMapRunner[Row, Row](
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      flatMap.returnType)
+
     inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 3ebee2c..948dd27 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -88,7 +88,7 @@ class DataSetValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new ValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index c6c25c0..59f723ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -24,13 +24,13 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
-import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * Flink RelNode which matches along with FlatMapOperator.
@@ -45,7 +45,7 @@ class DataStreamCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[CRow]
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = schema.logicalType
@@ -83,28 +83,28 @@ class DataStreamCalc(
     estimateRowCount(calcProgram, rowCnt)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     val inputDataStream = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputRowType = 
inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
 
-    val generator = new CodeGenerator(config, false, inputDataStream.getType)
+    val generator = new CodeGenerator(config, false, inputRowType)
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
+      ruleDescription,
       inputSchema,
       schema,
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      schema.physicalTypeInfo)
+    val mapFunc = new CRowFlatMapRunner(
+      genFunction.name,
+      genFunction.code,
+      CRowTypeInfo(schema.physicalTypeInfo))
 
-    val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, 
getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 899d8ef..19ad89b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -28,7 +28,8 @@ import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -45,7 +46,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with CommonCorrelate
+  with CommonCorrelate[CRow]
   with DataStreamRel {
 
   override def deriveRowType() = schema.logicalType
@@ -81,31 +82,54 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
     val inputDS = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val mapFunc = correlateMapFunction(
+    val flatMap = generateFunction(
       config,
       inputSchema,
       udtfTypeInfo,
       schema,
+      getRowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
-    inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
schema.logicalType))
+    val collector = generateCollector(
+      config,
+      inputSchema,
+      udtfTypeInfo,
+      schema,
+      getRowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CRowCorrelateFlatMapRunner(
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      CRowTypeInfo(flatMap.returnType))
+
+    val inputParallelism = inputDS.getParallelism
+
+    inputDS
+      .flatMap(mapFunc)
+      // preserve input parallelism to ensure that acc and retract messages 
remain in order
+      .setParallelism(inputParallelism)
+      .name(correlateOpName(rexCall, sqlFunction, schema.logicalType))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 3555c80..056cda9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -28,8 +28,9 @@ import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.types.Row
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   *
@@ -97,15 +98,18 @@ class DataStreamGroupAggregate(
         inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](
         inputSchema.mapAggregateCall(namedAggregate.left),
         namedAggregate.right)
     }
 
+    val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
+
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
@@ -129,28 +133,30 @@ class DataStreamGroupAggregate(
       physicalNamedAggregates,
       inputSchema.logicalType,
       inputSchema.physicalFieldTypeInfo,
-      groupings)
+      groupings,
+      DataStreamRetractionRules.isAccRetract(this),
+      DataStreamRetractionRules.isAccRetract(getInput))
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // grouped / keyed aggregation
       if (physicalGrouping.nonEmpty) {
         inputDS
         .keyBy(groupings: _*)
         .process(processFunction)
-        .returns(schema.physicalTypeInfo)
+        .returns(outRowType)
         .name(keyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
       // global / non-keyed aggregation
       else {
         inputDS
-        .keyBy(new NullByteKeySelector[Row])
+        .keyBy(new NullByteKeySelector[CRow])
         .process(processFunction)
         .setParallelism(1)
         .setMaxParallelism(1)
-        .returns(schema.physicalTypeInfo)
+        .returns(outRowType)
         .name(nonKeyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index ea4b0bf..f61828b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
@@ -34,10 +34,12 @@ import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.schema.RowSchema
 import 
org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.types.Row
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -101,14 +103,25 @@ class DataStreamGroupWindowAggregate(
           namedProperties))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
     val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
       new CalcitePair[AggregateCall, String](
         inputSchema.mapAggregateCall(namedAggregate.left),
         namedAggregate.right)
     }
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on windowed GroupBy aggregation is not supported yet. " +
+          "Note: Windowed GroupBy aggregation should not follow a " +
+          "non-windowed GroupBy aggregation.")
+    }
+
+    val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
 
     val aggString = aggregationToString(
       inputSchema.logicalType,
@@ -145,7 +158,7 @@ class DataStreamGroupWindowAggregate(
       val keyedStream = inputDS.keyBy(physicalGrouping: _*)
       val windowedStream =
         createKeyedWindowedStream(window, keyedStream)
-          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+          .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
@@ -154,15 +167,11 @@ class DataStreamGroupWindowAggregate(
           inputSchema.physicalType,
           inputSchema.physicalFieldTypeInfo,
           schema.physicalType,
+          physicalGrouping,
           needMerge)
 
       windowedStream
-        .aggregate(
-          aggFunction,
-          windowFunction,
-          accumulatorRowType,
-          aggResultRowType,
-          schema.physicalTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, outRowType)
         .name(keyedAggOpName)
     }
     // global / non-keyed aggregation
@@ -174,7 +183,7 @@ class DataStreamGroupWindowAggregate(
 
       val windowedStream =
         createNonKeyedWindowedStream(window, inputDS)
-          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+          .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
@@ -183,15 +192,11 @@ class DataStreamGroupWindowAggregate(
           inputSchema.physicalType,
           inputSchema.physicalFieldTypeInfo,
           schema.physicalType,
+          Array[Int](),
           needMerge)
 
       windowedStream
-        .aggregate(
-          aggFunction,
-          windowFunction,
-          accumulatorRowType,
-          aggResultRowType,
-          schema.physicalTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, outRowType)
         .name(nonKeyedAggOpName)
     }
   }
@@ -199,9 +204,10 @@ class DataStreamGroupWindowAggregate(
 
 object DataStreamGroupWindowAggregate {
 
-
-  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: 
KeyedStream[Row, Tuple])
-    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+  private def createKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: KeyedStream[CRow, Tuple]):
+    WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
 
     case TumblingGroupWindow(_, timeField, size)
         if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
@@ -250,8 +256,10 @@ object DataStreamGroupWindowAggregate {
       stream.window(EventTimeSessionWindows.withGap(toTime(gap)))
   }
 
-  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: 
DataStream[Row])
-    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+  private def createNonKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: DataStream[CRow]):
+    AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
 
     case TumblingGroupWindow(_, timeField, size)
         if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index fb912c4..e823cd6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -20,21 +20,22 @@ package org.apache.flink.table.plan.nodes.datastream
 import java.util.{List => JList}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.types.Row
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 class DataStreamOverAggregate(
     logicWindow: Window,
@@ -87,7 +88,7 @@ class DataStreamOverAggregate(
           namedAggregates))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     if (logicWindow.groups.size > 1) {
       throw new TableException(
         "Unsupported use of OVER windows. All aggregates must be computed on 
the same window.")
@@ -110,6 +111,8 @@ class DataStreamOverAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
@@ -120,6 +123,12 @@ class DataStreamOverAggregate(
       .get(orderKey.getFieldIndex)
       .getType
 
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on Over window aggregation is not supported yet. " +
+          "Note: Over window aggregation should not follow a non-windowed 
GroupBy aggregation.")
+    }
+
     timeType match {
       case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
         // proc-time OVER window
@@ -138,8 +147,7 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = false,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -154,16 +162,14 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else if (overWindow.lowerBound.isPreceding && 
overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           createBoundedAndCurrentRowOverWindow(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -177,12 +183,14 @@ class DataStreamOverAggregate(
 
   def createUnboundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
+
     val partitionKeys: Array[Int] = 
overWindow.keys.toArray.map(schema.mapIndex)
+
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates.map {
       namedAggregate =>
         new CalcitePair[AggregateCall, String](
@@ -190,6 +198,9 @@ class DataStreamOverAggregate(
           namedAggregate.right)
     }
 
+    // get the output types
+    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+
     val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
       generator,
       namedAggregates,
@@ -200,30 +211,28 @@ class DataStreamOverAggregate(
       partitionKeys.nonEmpty,
       isRowsClause)
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(schema.physicalTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
+          .asInstanceOf[DataStream[CRow]]
       }
       // non-partitioned aggregation
       else {
         if (isRowTimeType) {
-          inputDS.keyBy(new NullByteKeySelector[Row])
+          inputDS.keyBy(new NullByteKeySelector[CRow])
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(schema.physicalTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         } else {
           inputDS
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(schema.physicalTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         }
       }
     result
@@ -231,9 +240,9 @@ class DataStreamOverAggregate(
 
   def createBoundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = 
overWindow.keys.toArray.map(schema.mapIndex)
@@ -245,7 +254,10 @@ class DataStreamOverAggregate(
     }
 
     val precedingOffset =
-      getLowerBoundary(logicWindow, overWindow, input) + (if (isRowsClause) 1 
else 0)
+      getLowerBoundary(logicWindow, overWindow, getInput()) + (if 
(isRowsClause) 1 else 0)
+
+    // get the output types
+    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
 
     val processFunction = AggregateUtil.createBoundedOverProcessFunction(
       generator,
@@ -257,24 +269,22 @@ class DataStreamOverAggregate(
       isRowsClause,
       isRowTimeType
     )
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(schema.physicalTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
       // non-partitioned aggregation
       else {
         inputDS
-          .keyBy(new NullByteKeySelector[Row])
+          .keyBy(new NullByteKeySelector[CRow])
           .process(processFunction).setParallelism(1).setMaxParallelism(1)
-          .returns(schema.physicalTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 128da81..9754de4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRelNode
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.CRow
 
 trait DataStreamRel extends FlinkRelNode {
 
@@ -29,9 +29,9 @@ trait DataStreamRel extends FlinkRelNode {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @return DataStream of type [[Row]]
+    * @return DataStream of type [[CRow]]
     */
-  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
+  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes 
are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 05f60ba..c613646 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -24,8 +24,9 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema}
-import org.apache.flink.types.Row
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.types.CRow
 
 /**
   * Flink RelNode which matches along with DataStreamSource.
@@ -53,7 +54,7 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     convertToInternalRow(schema, inputDataStream, dataStreamTable, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 47b4946..654c259 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.CRow
 
 /**
   * Flink RelNode which matches along with Union.
@@ -58,7 +58,7 @@ class DataStreamUnion(
     s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val leftDataSet = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     val rightDataSet = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index c964e03..32c9aaf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.io.CRowValuesInputFormat
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConverters._
 
@@ -56,10 +56,11 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
+    val returnType = CRowTypeInfo(schema.physicalTypeInfo)
     val generator = new CodeGenerator(config)
 
     // generate code for every record
@@ -76,12 +77,12 @@ class DataStreamValues(
       generatedRecords.map(_.code),
       schema.physicalTypeInfo)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new CRowValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
-      generatedFunction.returnType)
+      returnType)
 
-    tableEnv.execEnv.createInput(inputFormat, schema.physicalTypeInfo)
+    tableEnv.execEnv.createInput(inputFormat, returnType)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index dd82819..25e72fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -22,46 +22,51 @@ import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.plan.nodes.CommonScan
-import org.apache.flink.table.plan.schema.{FlinkTable, RowSchema}
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.CRowOutputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConverters._
 
-trait StreamScan extends CommonScan with DataStreamRel {
+trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
   protected def convertToInternalRow(
       schema: RowSchema,
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
       config: TableConfig)
-    : DataStream[Row] = {
+    : DataStream[CRow] = {
+
+    val inputType = input.getType
+    val internalType = CRowTypeInfo(schema.physicalTypeInfo)
 
     // conversion
-    if (needsConversion(input.getType, schema.physicalTypeInfo)) {
+    if (needsConversion(input.getType, internalType)) {
 
       val function = generatedConversionFunction(
         config,
         classOf[MapFunction[Any, Row]],
-        input.getType,
+        inputType,
         schema.physicalTypeInfo,
         "DataStreamSourceConversion",
         schema.physicalFieldNames,
         Some(flinkTable.fieldIndexes))
 
-      val runner = new MapRunner[Any, Row](
+      val mapFunc = new CRowOutputMapRunner(
         function.name,
         function.code,
-        function.returnType)
+        internalType)
 
-      val opName = s"from: (${schema.logicalFieldNames.mkString(", ")})"
+      val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
       // TODO we need a ProcessFunction here
-      input.map(runner).name(opName)
+      input.map(mapFunc).name(opName).returns(internalType)
     }
     // no conversion necessary, forward
     else {
-      input.asInstanceOf[DataStream[Row]]
+      input.asInstanceOf[DataStream[CRow]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index e34e416..b2d7019 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -25,9 +25,11 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable}
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.sources._
-import org.apache.flink.types.Row
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -96,7 +98,7 @@ class StreamTableSourceScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
     convertToInternalRow(

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
index ec90392..0ca079e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -34,7 +34,7 @@ class FlinkLogicalCalc(
     calcProgram: RexProgram)
   extends Calc(cluster, traitSet, input, calcProgram)
   with FlinkLogicalRel
-  with CommonCalc {
+  with CommonCalc[Any] {
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
     new FlinkLogicalCalc(cluster, traitSet, child, program)

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index aeb67b6..bd9a7ee 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -82,6 +82,14 @@ object DataStreamRetractionRules {
   }
 
   /**
+    * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+    */
+  def isAccRetract(node: RelNode): Boolean = {
+    val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+    null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+  }
+
+  /**
     * Rule that assigns the default retraction information to 
[[DataStreamRel]] nodes.
     * The default is to not publish updates as retraction messages and 
[[AccMode.Acc]].
     */
@@ -190,14 +198,6 @@ object DataStreamRetractionRules {
     }
 
     /**
-      * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
-      */
-    def isAccRetract(node: RelNode): Boolean = {
-      val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
-      null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
-    }
-
-    /**
       * Set [[AccMode.AccRetract]] to a [[RelNode]].
       */
     def setAccRetract(relNode: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
new file mode 100644
index 0000000..66e51b1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowCorrelateFlatMapRunner(
+    flatMapName: String,
+    flatMapCode: String,
+    collectorName: String,
+    collectorCode: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[Any] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var collector: TableFunctionCollector[_] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n 
Code:\n$collectorCode")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, 
collectorName, collectorCode)
+    LOG.debug("Instantiating TableFunctionCollector.")
+    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+    this.cRowWrapper = new CRowWrappingCollector()
+
+    LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n 
Code:\n$flatMapCode")
+    val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, 
flatMapName, flatMapCode)
+    val constructor = 
flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = 
constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+
+    collector.setCollector(cRowWrapper)
+    collector.setInput(in.row)
+    collector.reset()
+
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
new file mode 100644
index 0000000..9a4650b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * FlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowFlatMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[FlatMapFunction[Row, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = clazz.newInstance()
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+
+    this.cRowWrapper = new CRowWrappingCollector()
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}
+
+

Reply via email to