[FLINK-6091] [table] Implement and turn on retraction for non-windowed 
aggregates.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b237a3ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b237a3ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b237a3ef

Branch: refs/heads/table-retraction
Commit: b237a3ef0a6632a5a4fa474e42b2f69d24b74dd5
Parents: 5ba0f02
Author: Hequn Cheng <chenghe...@gmail.com>
Authored: Tue Apr 18 16:54:09 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Apr 26 12:20:22 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  63 +++-
 .../table/api/StreamTableEnvironment.scala      |  62 +++-
 .../flink/table/api/TableEnvironment.scala      |  65 ++--
 .../flink/table/plan/nodes/CommonCalc.scala     |  26 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  58 +---
 .../flink/table/plan/nodes/CommonScan.scala     |  25 +-
 .../table/plan/nodes/dataset/BatchScan.scala    |  10 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  23 +-
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  34 +-
 .../plan/nodes/dataset/DataSetValues.scala      |   2 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  30 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |  40 ++-
 .../datastream/DataStreamGroupAggregate.scala   |  23 +-
 .../DataStreamGroupWindowAggregate.scala        |  38 ++-
 .../datastream/DataStreamOverAggregate.scala    |  61 ++--
 .../plan/nodes/datastream/DataStreamRel.scala   |   6 +-
 .../datastream/DataStreamRetractionRules.scala  |  16 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   3 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   3 +-
 .../nodes/datastream/DataStreamValues.scala     |  14 +-
 .../plan/nodes/datastream/StreamScan.scala      |  23 +-
 .../datastream/StreamTableSourceScan.scala      |   4 +-
 .../runtime/CRowCorrelateFlatMapRunner.scala    |  83 +++++
 .../flink/table/runtime/CRowFlatMapRunner.scala |  72 ++++
 .../table/runtime/CRowInputMapRunner.scala      |  57 ++++
 .../table/runtime/CRowOutputMapRunner.scala     |  60 ++++
 .../table/runtime/CRowWrappingCollector.scala   |  41 +++
 .../flink/table/runtime/FlatMapRunner.scala     |  17 +-
 .../aggregate/AggregateAggFunction.scala        |  11 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  36 +-
 ...SetSessionWindowAggReduceGroupFunction.scala |   4 +-
 ...taSetSlideWindowAggReduceGroupFunction.scala |   4 +-
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   4 +-
 .../aggregate/GroupAggProcessFunction.scala     |  74 +++--
 ...rementalAggregateAllTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateAllWindowFunction.scala |  11 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   7 +-
 .../IncrementalAggregateWindowFunction.scala    |  13 +-
 .../aggregate/ProcTimeBoundedRangeOver.scala    |  30 +-
 .../aggregate/ProcTimeBoundedRowsOver.scala     |  26 +-
 .../ProcTimeUnboundedNonPartitionedOver.scala   |  23 +-
 .../ProcTimeUnboundedPartitionedOver.scala      |  22 +-
 .../aggregate/RowTimeBoundedRangeOver.scala     |  32 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |  30 +-
 .../aggregate/RowTimeUnboundedOver.scala        |  50 +--
 .../aggregate/TimeWindowPropertyCollector.scala |  34 +-
 .../runtime/io/CRowValuesInputFormat.scala      |  59 ++++
 .../table/runtime/io/ValuesInputFormat.scala    |  17 +-
 .../apache/flink/table/runtime/types/CRow.scala |  55 +++
 .../table/runtime/types/CRowComparator.scala    |  83 +++++
 .../table/runtime/types/CRowSerializer.scala    |  78 +++++
 .../table/runtime/types/CRowTypeInfo.scala      |  98 ++++++
 .../apache/flink/table/sinks/CsvTableSink.scala |   2 +
 .../flink/table/TableEnvironmentTest.scala      |  68 +++-
 .../scala/batch/TableEnvironmentITCase.scala    |  20 ++
 .../api/scala/stream/RetractionITCase.scala     | 331 +++++++++++++++++++
 .../api/scala/stream/TableSinkITCase.scala      |   2 +-
 .../api/scala/stream/utils/StreamITCase.scala   |  11 +-
 ...ProcessingOverRangeProcessFunctionTest.scala | 105 +++---
 .../runtime/types/CRowComparatorTest.scala      |  61 ++++
 .../table/utils/MockTableEnvironment.scala      |   8 +
 61 files changed, 1889 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index b48e9f9..5111926 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,15 +26,19 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, 
DataSetRel}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -127,6 +131,56 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Creates a final converter that maps the internal row type to external 
type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has 
to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // Row to Row, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // Row to CRow, only needs to be wrapped
+      Some(
+        new RichMapFunction[Row, CRow] {
+          private var outCRow: CRow = _
+          override def open(parameters: Configuration): Unit = outCRow = new 
CRow(null, true)
+          override def map(value: Row): CRow = {
+            outCRow.row = value
+            outCRow
+          }
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // some type that is neither Row or CRow
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      val mapFunction = new MapRunner[IN, OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+
+      Some(mapFunction)
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *
@@ -275,10 +329,15 @@ abstract class BatchTableEnvironment(
     logicalPlan match {
       case node: DataSetRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataSetSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, 
"DataSetSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[Row, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataSet[A]]
         }
 
       case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 225a675..1b81cfc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -35,6 +36,8 @@ import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamRel}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -168,6 +171,52 @@ abstract class StreamTableEnvironment(
     }
   }
 
+
+  /**
+    * Creates a final converter that maps the internal row type to external 
type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has 
to be a
+    *                     valid Java class identifier.
+    */
+  override protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+  Option[MapFunction[IN, OUT]] = {
+
+    if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+      // CRow to CRow, no conversion needed
+      None
+    } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+      // CRow to Row, only needs to be unwrapped
+      Some(
+        new MapFunction[CRow, Row] {
+          override def map(value: CRow): Row = value.row
+        }.asInstanceOf[MapFunction[IN, OUT]]
+      )
+    } else {
+      // Some type that is neither CRow nor Row
+
+      val converterFunction = generateRowConverterFunction[OUT](
+        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+        logicalRowType,
+        requestedTypeInfo,
+        functionName
+      )
+
+      Some(new CRowInputMapRunner[OUT](
+        converterFunction.name,
+        converterFunction.code,
+        converterFunction.returnType)
+        .asInstanceOf[MapFunction[IN, OUT]])
+
+    }
+  }
+
   /**
     * Registers a [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
     * catalog.
@@ -323,10 +372,15 @@ abstract class StreamTableEnvironment(
     logicalPlan match {
       case node: DataStreamRel =>
         val plan = node.translateToPlan(this)
-        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataStreamSinkConversion")
+        val conversion =
+          getConversionMapper(plan.getType, logicalType, tpe, 
"DataStreamSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataStream[A]] // no conversion 
necessary
-          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+          case Some(mapFunction: MapFunction[CRow, A]) =>
+            plan.map(mapFunction)
+              .returns(tpe)
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataStream[A]]
         }
 
       case _ =>
@@ -344,9 +398,9 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataStream = translate[Row](
+    val dataStream = translate[CRow](
       optimizedPlan,
-      ast.getRowType)(new GenericTypeInfo(classOf[Row]))
+      ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2ddad45..7ebf8a6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,14 +48,14 @@ import 
org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, 
GeneratedFunction}
 import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
@@ -576,6 +576,18 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
         }
+      case cr: CRowTypeInfo =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = cr.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $cr")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression 
expected.")
+        }
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex.map {
           case (UnresolvedFieldReference(name), idx) => (idx, name)
@@ -621,37 +633,38 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Creates a final converter that maps the internal row type to external 
type.
     *
-    * @param physicalRowTypeInfo the input of the sink
+    * @param physicalTypeInfo the input of the sink
     * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has 
to be a
     *                     valid Java class identifier.
     */
-  protected def sinkConversion[T](
-      physicalRowTypeInfo: TypeInformation[Row],
+  protected def getConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    Option[MapFunction[IN, OUT]]
+
+  protected def generateRowConverterFunction[OUT](
+      inputTypeInfo: TypeInformation[Row],
       logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[T],
-      functionName: String)
-    : Option[MapFunction[Row, T]] = {
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    GeneratedFunction[MapFunction[Row, OUT], OUT] = {
 
     // validate that at least the field types of physical and logical type 
match
     // we do that here to make sure that plan translation was correct
     val logicalRowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
-    if (physicalRowTypeInfo != logicalRowTypeInfo) {
+    if (logicalRowTypeInfo != inputTypeInfo) {
       throw TableException("The field types of physical and logical row types 
do not match." +
         "This is a bug and should not happen. Please file an issue.")
     }
 
-    // requested type is a generic Row, no conversion needed
-    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
-          requestedTypeInfo.getTypeClass == classOf[Row]) {
-      return None
-    }
-
     // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { 
relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala
+      .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
+
     // field names
     val logicalFieldNames = logicalRowType.getFieldNames.asScala
 
@@ -659,8 +672,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
       throw new TableException("Arity of result does not match requested 
type.")
     }
-    requestedTypeInfo match {
 
+    requestedTypeInfo match {
       // POJO type requested
       case pt: PojoTypeInfo[_] =>
         logicalFieldNames.zip(logicalFieldTypes) foreach {
@@ -707,7 +720,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     val generator = new CodeGenerator(
       config,
       false,
-      physicalRowTypeInfo,
+      inputTypeInfo,
       None,
       None)
 
@@ -721,20 +734,12 @@ abstract class TableEnvironment(val config: TableConfig) {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       functionName,
-      classOf[MapFunction[Row, T]],
+      classOf[MapFunction[Row, OUT]],
       body,
       requestedTypeInfo)
-
-    val mapFunction = new MapRunner[Row, T](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-    Some(mapFunction)
   }
-
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index bc25140..b7b54cb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -26,21 +26,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait CommonCalc {
+trait CommonCalc[T] {
 
-  private[flink] def functionBody(
+  private[flink] def generateFunction(
       generator: CodeGenerator,
+      ruleDescription: String,
       inputType: TypeInformation[Row],
       rowType: RelDataType,
       calcProgram: RexProgram,
-      config: TableConfig)
-    : String = {
+      config: TableConfig):
+    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
 
@@ -53,7 +53,7 @@ trait CommonCalc {
       expandedExpressions)
 
     // only projection
-    if (condition == null) {
+    val body = if (condition == null) {
       s"""
         |${projection.code}
         |${generator.collectorTerm}.collect(${projection.resultTerm});
@@ -82,16 +82,12 @@ trait CommonCalc {
           |""".stripMargin
       }
     }
-  }
-
-  private[flink] def calcMapFunction(
-      genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
-    : RichFlatMapFunction[Row, Row] = {
 
-    new FlatMapRunner[Row, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
+    generator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]],
+      body,
+      returnType)
   }
 
   private[flink] def conditionToString(

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 6c4066b..83a68c0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -23,12 +23,11 @@ import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
 import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, 
NO_CODE}
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, 
GeneratedExpression, GeneratedFunction}
 import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, 
TableFunctionCollector}
+import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -36,59 +35,12 @@ import scala.collection.JavaConverters._
 /**
   * Join a user-defined table function
   */
-trait CommonCorrelate {
-
-  /**
-    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
-    * and user-defined table function.
-    */
-  private[flink] def correlateMapFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Row],
-      udtfTypeInfo: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field 
mapping
-      ruleDescription: String)
-    : CorrelateFlatMapRunner[Row, Row] = {
-
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
-
-    val flatMap = generateFunction(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      joinType,
-      rexCall,
-      pojoFieldMapping,
-      ruleDescription)
-
-    val collector = generateCollector(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      condition,
-      pojoFieldMapping)
-
-    new CorrelateFlatMapRunner[Row, Row](
-      flatMap.name,
-      flatMap.code,
-      collector.name,
-      collector.code,
-      flatMap.returnType)
-
-  }
+trait CommonCorrelate[T] {
 
   /**
     * Generates the flat map function to run the user-defined table function.
     */
-  private def generateFunction(
+  private[flink] def generateFunction(
       config: TableConfig,
       inputTypeInfo: TypeInformation[Row],
       udtfTypeInfo: TypeInformation[Any],
@@ -97,8 +49,8 @@ trait CommonCorrelate {
       joinType: SemiJoinType,
       rexCall: RexCall,
       pojoFieldMapping: Option[Array[Int]],
-      ruleDescription: String)
-    : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+      ruleDescription: String):
+    GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
 
     val functionGenerator = new CodeGenerator(
       config,

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 274b602..5c44525 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -20,16 +20,14 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.types.Row
 
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan {
+trait CommonScan[T] {
 
   /**
     * We check if the input type is exactly the same as the internal row type.
@@ -37,20 +35,17 @@ trait CommonScan {
     */
   private[flink] def needsConversion(
       externalTypeInfo: TypeInformation[Any],
-      internalTypeInfo: TypeInformation[Row])
-    : Boolean = {
-
+      internalTypeInfo: TypeInformation[T]): Boolean =
     externalTypeInfo != internalTypeInfo
-  }
 
-  private[flink] def getConversionMapper(
+  private[flink] def generateConversionFunction(
       config: TableConfig,
       inputType: TypeInformation[Any],
       expectedType: TypeInformation[Row],
       conversionOperatorName: String,
       fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Row] = {
+      inputPojoFieldMapping: Option[Array[Int]] = None):
+    GeneratedFunction[MapFunction[Any, Row], Row] = {
 
     val generator = new CodeGenerator(
       config,
@@ -66,17 +61,11 @@ trait CommonScan {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    val genFunction = generator.generateFunction(
+    generator.generateFunction(
       conversionOperatorName,
       classOf[MapFunction[Any, Row]],
       body,
       expectedType)
-
-    new MapRunner[Any, Row](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index b39b8ed..64286f0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -23,12 +23,13 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait BatchScan extends CommonScan with DataSetRel {
+trait BatchScan extends CommonScan[Row] with DataSetRel {
 
   protected def convertToInternalRow(
       input: DataSet[Any],
@@ -43,7 +44,7 @@ trait BatchScan extends CommonScan with DataSetRel {
     // conversion
     if (needsConversion(inputType, internalType)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generateConversionFunction(
         config,
         inputType,
         internalType,
@@ -51,6 +52,11 @@ trait BatchScan extends CommonScan with DataSetRel {
         getRowType.getFieldNames,
         Some(flinkTable.fieldIndexes))
 
+      val mapFunc = new MapRunner[Any, Row](
+        function.name,
+        function.code,
+        function.returnType)
+
       val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
       input.map(mapFunc).name(opName)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e05b5a8..359f5a5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.FlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -44,7 +47,7 @@ class DataSetCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[Row]
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -83,24 +86,22 @@ class DataSetCalc(
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val returnType = FlinkTypeFactory
+      .toInternalRowTypeInfo(getRowType)
+      .asInstanceOf[RowTypeInfo]
 
     val generator = new CodeGenerator(config, false, inputDS.getType)
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
+      ruleDescription,
       inputDS.getType,
       getRowType,
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      returnType)
+    val mapFunc = new FlatMapRunner(genFunction.name, genFunction.code, 
returnType)
 
-    val mapFunc = calcMapFunction(genFunction)
     inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index c18a829..c658012 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -26,9 +26,11 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.table.runtime.CorrelateFlatMapRunner
 import org.apache.flink.types.Row
 
 /**
@@ -45,7 +47,7 @@ class DataSetCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, inputNode)
-  with CommonCorrelate
+  with CommonCorrelate[Row]
   with DataSetRel {
 
   override def deriveRowType() = relRowType
@@ -95,20 +97,38 @@ class DataSetCorrelate(
     val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val mapFunc = correlateMapFunction(
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+    val flatMap = generateFunction(
       config,
       inputDS.getType,
       udtfTypeInfo,
-      getRowType,
+      returnType,
+      rowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
+    val collector = generateCollector(
+      config,
+      inputDS.getType,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CorrelateFlatMapRunner[Row, Row](
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      flatMap.returnType)
+
     inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 3ebee2c..948dd27 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -88,7 +88,7 @@ class DataSetValues(
       generatedRecords.map(_.code),
       returnType)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new ValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
       generatedFunction.returnType)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b015a1d..0bf723d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 
 /**
@@ -44,7 +47,7 @@ class DataStreamCalc(
     calcProgram: RexProgram,
     ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
-  with CommonCalc
+  with CommonCalc[CRow]
   with DataStreamRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -75,28 +78,29 @@ class DataStreamCalc(
     estimateRowCount(calcProgram, rowCnt)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     val inputDataStream = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputRowType = 
inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
+    val outputRowType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-    val generator = new CodeGenerator(config, false, inputDataStream.getType)
+    val generator = new CodeGenerator(config, false, inputRowType)
 
-    val body = functionBody(
+    val genFunction = generateFunction(
       generator,
-      inputDataStream.getType,
+      ruleDescription,
+      inputRowType,
       getRowType,
       calcProgram,
       config)
 
-    val genFunction = generator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Row, Row]],
-      body,
-      FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
+    val mapFunc = new CRowFlatMapRunner(
+      genFunction.name,
+      genFunction.code,
+      CRowTypeInfo(outputRowType))
 
-    val mapFunc = calcMapFunction(genFunction)
     inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, 
getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 9ea413a..c4389f7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -25,10 +25,12 @@ import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.CommonCorrelate
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   * Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +46,7 @@ class DataStreamCorrelate(
     joinType: SemiJoinType,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
-  with CommonCorrelate
+  with CommonCorrelate[CRow]
   with DataStreamRel {
 
   override def deriveRowType() = relRowType
@@ -79,30 +81,50 @@ class DataStreamCorrelate(
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
     // we do not need to specify input type
     val inputDS = 
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
 
     val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
     val rexCall = funcRel.getCall.asInstanceOf[RexCall]
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+    val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
     val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
 
-    val mapFunc = correlateMapFunction(
+    val rowType = inputType.rowType
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(relRowType)
+
+    val flatMap = generateFunction(
       config,
-      inputDS.getType,
+      rowType,
       udtfTypeInfo,
+      returnType,
       getRowType,
       joinType,
       rexCall,
-      condition,
-      Some(pojoFieldMapping),
+      pojoFieldMapping,
       ruleDescription)
 
+    val collector = generateCollector(
+      config,
+      rowType,
+      udtfTypeInfo,
+      returnType,
+      getRowType,
+      condition,
+      pojoFieldMapping)
+
+    val mapFunc = new CRowCorrelateFlatMapRunner(
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      CRowTypeInfo(flatMap.returnType))
+
     inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index d4aa33c..04b84ba 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -27,8 +27,8 @@ import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.types.Row
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 /**
   *
@@ -90,11 +90,10 @@ class DataStreamGroupAggregate(
       .item("select", aggregationToString(inputType, groupings, getRowType, 
namedAggregates, Nil))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val outRowType = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val aggString = aggregationToString(
       inputType,
@@ -110,28 +109,30 @@ class DataStreamGroupAggregate(
     val processFunction = AggregateUtil.createGroupAggregateFunction(
       namedAggregates,
       inputType,
-      groupings)
+      groupings,
+      DataStreamRetractionRules.isAccRetract(this),
+      DataStreamRetractionRules.isAccRetract(getInput))
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // grouped / keyed aggregation
       if (groupings.nonEmpty) {
         inputDS
         .keyBy(groupings: _*)
         .process(processFunction)
-        .returns(rowTypeInfo)
+        .returns(outRowType)
         .name(keyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
       // global / non-keyed aggregation
       else {
         inputDS
-        .keyBy(new NullByteKeySelector[Row])
+        .keyBy(new NullByteKeySelector[CRow])
         .process(processFunction)
         .setParallelism(1)
         .setMaxParallelism(1)
-        .returns(rowTypeInfo)
+        .returns(outRowType)
         .name(nonKeyedAggOpName)
-        .asInstanceOf[DataStream[Row]]
+        .asInstanceOf[DataStream[CRow]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c78e8bd..e177e63 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream,
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.expressions._
@@ -36,9 +36,9 @@ import org.apache.flink.table.plan.nodes.CommonAggregate
 import 
org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -102,12 +102,21 @@ class DataStreamGroupWindowAggregate(
           namedProperties))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val groupingKeys = grouping.indices.toArray
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on windowed GroupBy aggregation is not supported yet. " +
+          "Note: Windowed GroupBy aggregation should not follow a " +
+          "non-windowed GroupBy aggregation.")
+    }
+
+    val outRowType = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val aggString = aggregationToString(
       inputType,
@@ -133,7 +142,7 @@ class DataStreamGroupWindowAggregate(
       val keyedStream = inputDS.keyBy(groupingKeys: _*)
       val windowedStream =
         createKeyedWindowedStream(window, keyedStream)
-          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+          .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamGroupWindowAggregateFunction(
@@ -143,7 +152,7 @@ class DataStreamGroupWindowAggregate(
           grouping)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, rowTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, outRowType)
         .name(keyedAggOpName)
     }
     // global / non-keyed aggregation
@@ -155,7 +164,7 @@ class DataStreamGroupWindowAggregate(
 
       val windowedStream =
         createNonKeyedWindowedStream(window, inputDS)
-          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+          .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamGroupWindowAggregateFunction(
@@ -165,7 +174,7 @@ class DataStreamGroupWindowAggregate(
           grouping)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, rowTypeInfo)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, outRowType)
         .name(nonKeyedAggOpName)
     }
   }
@@ -173,9 +182,10 @@ class DataStreamGroupWindowAggregate(
 
 object DataStreamGroupWindowAggregate {
 
-
-  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: 
KeyedStream[Row, Tuple])
-    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+  private def createKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: KeyedStream[CRow, Tuple]):
+    WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
 
     case ProcessingTimeTumblingGroupWindow(_, size) if 
isTimeInterval(size.resultType) =>
       stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
@@ -216,8 +226,10 @@ object DataStreamGroupWindowAggregate {
       stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
   }
 
-  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: 
DataStream[Row])
-    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+  private def createNonKeyedWindowedStream(
+      groupWindow: LogicalWindow,
+      stream: DataStream[CRow]):
+    AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
 
     case ProcessingTimeTumblingGroupWindow(_, size) if 
isTimeInterval(size.resultType) =>
       stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 031d533..aefecba 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -25,18 +25,16 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.types.Row
-
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 class DataStreamOverAggregate(
     logicWindow: Window,
@@ -87,7 +85,7 @@ class DataStreamOverAggregate(
           namedAggregates))
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     if (logicWindow.groups.size > 1) {
       throw new TableException(
         "Unsupported use of OVER windows. All aggregates must be computed on 
the same window.")
@@ -110,6 +108,8 @@ class DataStreamOverAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
@@ -120,6 +120,12 @@ class DataStreamOverAggregate(
       .get(orderKey.getFieldIndex)
       .getValue
 
+    if (consumeRetraction) {
+      throw new TableException(
+        "Retraction on Over window aggregation is not supported yet. " +
+          "Note: Over window aggregation should not follow a non-windowed 
GroupBy aggregation.")
+    }
+
     timeType match {
       case _: ProcTimeType =>
         // proc-time OVER window
@@ -138,8 +144,7 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = false,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -153,16 +158,14 @@ class DataStreamOverAggregate(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-          )
+            isRowsClause = overWindow.isRows)
         } else if (overWindow.lowerBound.isPreceding && 
overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           createBoundedAndCurrentRowOverWindow(
             generator,
             inputDS,
             isRowTimeType = true,
-            isRowsClause = overWindow.isRows
-            )
+            isRowsClause = overWindow.isRows)
         } else {
           throw new TableException(
             "OVER RANGE FOLLOWING windows are not supported yet.")
@@ -177,16 +180,16 @@ class DataStreamOverAggregate(
 
   def createUnboundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
     val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
 
     // get the output types
-    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnTypeInfo = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
       generator,
@@ -196,30 +199,28 @@ class DataStreamOverAggregate(
       partitionKeys.nonEmpty,
       isRowsClause)
 
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
+          .asInstanceOf[DataStream[CRow]]
       }
       // non-partitioned aggregation
       else {
         if (isRowTimeType) {
-          inputDS.keyBy(new NullByteKeySelector[Row])
+          inputDS.keyBy(new NullByteKeySelector[CRow])
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         } else {
           inputDS
             .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(rowTypeInfo)
+            .returns(returnTypeInfo)
             .name(aggOpName)
-            .asInstanceOf[DataStream[Row]]
         }
       }
     result
@@ -227,9 +228,9 @@ class DataStreamOverAggregate(
 
   def createBoundedAndCurrentRowOverWindow(
     generator: CodeGenerator,
-    inputDS: DataStream[Row],
+    inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
-    isRowsClause: Boolean): DataStream[Row] = {
+    isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
     val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -239,9 +240,9 @@ class DataStreamOverAggregate(
       getLowerBoundary(logicWindow, overWindow, getInput()) + (if 
(isRowsClause) 1 else 0)
 
     // get the output types
-    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+    val returnTypeInfo = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
-    val processFunction = AggregateUtil.createBoundedOverProcessFunction(
+    val processFunction = AggregateUtil.createBoundedOverProcessFunction[CRow](
       generator,
       namedAggregates,
       inputType,
@@ -249,24 +250,22 @@ class DataStreamOverAggregate(
       isRowsClause,
       isRowTimeType
     )
-    val result: DataStream[Row] =
+    val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(partitionKeys: _*)
           .process(processFunction)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
       // non-partitioned aggregation
       else {
         inputDS
-          .keyBy(new NullByteKeySelector[Row])
+          .keyBy(new NullByteKeySelector[CRow])
           .process(processFunction).setParallelism(1).setMaxParallelism(1)
-          .returns(rowTypeInfo)
+          .returns(returnTypeInfo)
           .name(aggOpName)
-          .asInstanceOf[DataStream[Row]]
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 1451a1b..99e9052 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.CRow
 
 trait DataStreamRel extends RelNode with FlinkRel {
 
@@ -30,9 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel {
     * Translates the FlinkRelNode into a Flink operator.
     *
     * @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
-    * @return DataStream of type [[Row]]
+    * @return DataStream of type [[CRow]]
     */
-  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
+  def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
 
   /**
     * Whether the [[DataStreamRel]] requires that update and delete changes 
are sent with retraction

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala
index 432158d..cd2810e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRules.scala
@@ -81,6 +81,14 @@ object DataStreamRetractionRules {
   }
 
   /**
+    * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+    */
+  def isAccRetract(node: RelNode): Boolean = {
+    val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+    null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+  }
+
+  /**
     * Rule that assigns the default retraction information to 
[[DataStreamRel]] nodes.
     * The default is to not publish updates as retraction messages and 
[[AccMode.Acc]].
     */
@@ -189,14 +197,6 @@ object DataStreamRetractionRules {
     }
 
     /**
-      * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
-      */
-    def isAccRetract(node: RelNode): Boolean = {
-      val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
-      null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
-    }
-
-    /**
       * Set [[AccMode.AccRetract]] to a [[RelNode]].
       */
     def setAccRetract(relNode: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c187ae8..1a43edb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
 /**
@@ -53,7 +54,7 @@ class DataStreamScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
     convertToInternalRow(inputDataStream, dataStreamTable, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index f676176..cf1e1c1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -60,7 +61,7 @@ class DataStreamUnion(
     s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", 
")}))"
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val leftDataSet = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     val rightDataSet = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 0ab4a48..5e8d127 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.io.{CRowValuesInputFormat}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConverters._
 
@@ -57,11 +57,11 @@ class DataStreamValues(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
 
     val config = tableEnv.getConfig
 
-    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val returnType = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     val generator = new CodeGenerator(config)
 
@@ -77,12 +77,12 @@ class DataStreamValues(
     val generatedFunction = generator.generateValuesInputFormat(
       ruleDescription,
       generatedRecords.map(_.code),
-      returnType)
+      returnType.rowType)
 
-    val inputFormat = new ValuesInputFormat[Row](
+    val inputFormat = new CRowValuesInputFormat(
       generatedFunction.name,
       generatedFunction.code,
-      generatedFunction.returnType)
+      returnType)
 
     tableEnv.execEnv.createInput(inputFormat, returnType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 6d08302..02716cc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -23,41 +23,46 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowOutputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
-trait StreamScan extends CommonScan with DataStreamRel {
+trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
   protected def convertToInternalRow(
       input: DataStream[Any],
       flinkTable: FlinkTable[_],
       config: TableConfig)
-    : DataStream[Row] = {
+    : DataStream[CRow] = {
 
     val inputType = input.getType
-
-    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val internalType = 
CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
 
     // conversion
     if (needsConversion(inputType, internalType)) {
 
-      val mapFunc = getConversionMapper(
+      val function = generateConversionFunction(
         config,
         inputType,
-        internalType,
+        internalType.rowType,
         "DataStreamSourceConversion",
         getRowType.getFieldNames,
         Some(flinkTable.fieldIndexes))
 
+      val mapFunc = new CRowOutputMapRunner(
+        function.name,
+        function.code,
+        internalType)
+
       val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-      input.map(mapFunc).name(opName)
+      input.map(mapFunc).name(opName).returns(internalType)
     }
     // no conversion necessary, forward
     else {
-      input.asInstanceOf[DataStream[Row]]
+      input.asInstanceOf[DataStream[CRow]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 013c55f..82407f5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
-import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -60,7 +60,7 @@ class StreamTableSourceScan(
     )
   }
 
-  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[Row] = {
+  override def translateToPlan(tableEnv: StreamTableEnvironment): 
DataStream[CRow] = {
     val config = tableEnv.getConfig
     val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
     convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), 
config)

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
new file mode 100644
index 0000000..66e51b1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowCorrelateFlatMapRunner(
+    flatMapName: String,
+    flatMapCode: String,
+    collectorName: String,
+    collectorCode: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[Any] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var collector: TableFunctionCollector[_] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n 
Code:\n$collectorCode")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, 
collectorName, collectorCode)
+    LOG.debug("Instantiating TableFunctionCollector.")
+    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+    this.cRowWrapper = new CRowWrappingCollector()
+
+    LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n 
Code:\n$flatMapCode")
+    val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, 
flatMapName, flatMapCode)
+    val constructor = 
flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = 
constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+
+    collector.setCollector(cRowWrapper)
+    collector.setInput(in.row)
+    collector.reset()
+
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
new file mode 100644
index 0000000..9a4650b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * FlatMapRunner with [[CRow]] input and [[CRow]] output.
+  */
+class CRowFlatMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichFlatMapFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[FlatMapFunction[Row, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[Row, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = clazz.newInstance()
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+
+    this.cRowWrapper = new CRowWrappingCollector()
+  }
+
+  override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+    cRowWrapper.out = out
+    cRowWrapper.setChange(in.change)
+    function.flatMap(in.row, cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
new file mode 100644
index 0000000..8e95c93
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] input.
+  */
+class CRowInputMapRunner[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[CRow, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[MapFunction[Row, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): OUT = {
+    function.map(in.row)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b237a3ef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
new file mode 100644
index 0000000..966dea9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] output.
+  */
+class CRowOutputMapRunner(
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[CRow])
+  extends RichMapFunction[Any, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[MapFunction[Any, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Any, Row] = _
+  private var outCRow: CRow = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+    outCRow = new CRow(null, true)
+  }
+
+  override def map(in: Any): CRow = {
+    outCRow.row = function.map(in)
+    outCRow
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

Reply via email to