[FLINK-5662] [table] Rework internal type handling of Table API

This closes #3271.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6b225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6b225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6b225

Branch: refs/heads/master
Commit: 6bc6b225e55095eb8797db2903b0546410e7fdd9
Parents: 1ce10c8
Author: twalthr <twal...@apache.org>
Authored: Mon Feb 6 17:18:08 2017 +0100
Committer: twalthr <twal...@apache.org>
Committed: Mon Feb 13 17:50:00 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  34 ++-
 .../table/api/StreamTableEnvironment.scala      |  41 ++--
 .../apache/flink/table/api/TableConfig.scala    |  24 --
 .../flink/table/api/TableEnvironment.scala      | 130 +++++++++-
 .../flink/table/calcite/FlinkTypeFactory.scala  |  17 +-
 .../flink/table/codegen/CodeGenerator.scala     |  33 +--
 .../flink/table/codegen/ExpressionReducer.scala |  11 +-
 .../apache/flink/table/codegen/generated.scala  |  25 +-
 .../flink/table/plan/logical/operators.scala    |  18 +-
 .../table/plan/nodes/CommonAggregate.scala      |  69 ++++++
 .../flink/table/plan/nodes/CommonCalc.scala     | 152 ++++++++++++
 .../table/plan/nodes/CommonCorrelate.scala      | 229 ++++++++++++++++++
 .../flink/table/plan/nodes/CommonScan.scala     |  82 +++++++
 .../flink/table/plan/nodes/FlinkAggregate.scala |  69 ------
 .../flink/table/plan/nodes/FlinkCalc.scala      | 172 -------------
 .../flink/table/plan/nodes/FlinkCorrelate.scala | 233 ------------------
 .../flink/table/plan/nodes/FlinkRel.scala       |  37 ---
 .../table/plan/nodes/dataset/BatchScan.scala    |  61 ++---
 .../nodes/dataset/BatchTableSourceScan.scala    |   7 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |  82 ++-----
 .../table/plan/nodes/dataset/DataSetCalc.scala  |  27 +--
 .../plan/nodes/dataset/DataSetCorrelate.scala   |  12 +-
 .../plan/nodes/dataset/DataSetIntersect.scala   |  57 +----
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  32 ++-
 .../table/plan/nodes/dataset/DataSetMinus.scala |  57 +----
 .../table/plan/nodes/dataset/DataSetRel.scala   |  17 +-
 .../table/plan/nodes/dataset/DataSetScan.scala  |   8 +-
 .../nodes/dataset/DataSetSingleRowJoin.scala    |  48 ++--
 .../table/plan/nodes/dataset/DataSetSort.scala  |  49 +---
 .../table/plan/nodes/dataset/DataSetUnion.scala |  24 +-
 .../plan/nodes/dataset/DataSetValues.scala      |  18 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  |  68 ++----
 .../nodes/datastream/DataStreamAggregate.scala  | 239 ++++++++-----------
 .../plan/nodes/datastream/DataStreamCalc.scala  |  27 +--
 .../nodes/datastream/DataStreamCorrelate.scala  |  14 +-
 .../plan/nodes/datastream/DataStreamRel.scala   |  15 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |  10 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   6 +-
 .../nodes/datastream/DataStreamValues.scala     |  23 +-
 .../plan/nodes/datastream/StreamScan.scala      |  84 ++-----
 .../datastream/StreamTableSourceScan.scala      |  12 +-
 .../table/runtime/aggregate/AggregateUtil.scala | 120 +++++-----
 .../flink/table/typeutils/TypeConverter.scala   | 156 ------------
 .../api/java/batch/TableEnvironmentITCase.java  |   7 +-
 .../scala/batch/TableEnvironmentITCase.scala    |   4 +-
 .../batch/utils/TableProgramsTestBase.scala     |  11 +-
 .../expressions/utils/ExpressionTestBase.scala  |  16 +-
 47 files changed, 1184 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index dd0487a..2dec00e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
@@ -135,7 +136,7 @@ abstract class BatchTableEnvironment(
   private[flink] def explain(table: Table, extended: Boolean): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataSet = translate[Row](optimizedPlan) 
(TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new 
GenericTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
@@ -250,28 +251,39 @@ abstract class BatchTableEnvironment(
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): 
DataSet[A] = {
-    val dataSetPlan = optimize(table.getRelNode)
-    translate(dataSetPlan)
+    val relNode = table.getRelNode
+    val dataSetPlan = optimize(relNode)
+    translate(dataSetPlan, relNode.getRowType)
   }
 
   /**
-    * Translates a logical [[RelNode]] into a [[DataSet]].
+    * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target 
type if necessary.
     *
     * @param logicalPlan The root node of the relational expression tree.
+    * @param logicalType The row type of the result. Since the logicalPlan can 
lose the
+    *                    field naming during optimization we pass the row type 
separately.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
     * @tparam A The type of the resulting [[DataSet]].
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](logicalPlan: RelNode)(implicit tpe: 
TypeInformation[A]): DataSet[A] = {
+  protected def translate[A](
+      logicalPlan: RelNode,
+      logicalType: RelDataType)
+      (implicit tpe: TypeInformation[A]): DataSet[A] = {
     TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataSet[A]]
-      case _ => ???
+        val plan = node.translateToPlan(this)
+        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataSetSinkConversion")
+        conversion match {
+          case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
+          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+        }
+
+      case _ =>
+        throw TableException("Cannot generate DataSet due to an invalid 
logical plan. " +
+          "This is a bug and should not happen. Please file an issue.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 81e884d..19c4af1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{Programs, RuleSet}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.explain.PlanJsonParser
@@ -200,11 +201,11 @@ abstract class StreamTableEnvironment(
     dataStream: DataStream[T],
     fields: Array[Expression]): Unit = {
 
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, 
fields.toArray)
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, 
fields)
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
-      fieldIndexes.toArray,
-      fieldNames.toArray
+      fieldIndexes,
+      fieldNames
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -255,30 +256,40 @@ abstract class StreamTableEnvironment(
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
-    val dataStreamPlan = optimize(table.getRelNode)
-    translate(dataStreamPlan)
+    val relNode = table.getRelNode
+    val dataStreamPlan = optimize(relNode)
+    translate(dataStreamPlan, relNode.getRowType)
   }
 
   /**
     * Translates a logical [[RelNode]] into a [[DataStream]].
     *
     * @param logicalPlan The root node of the relational expression tree.
+    * @param logicalType The row type of the result. Since the logicalPlan can 
lose the
+    *                    field naming during optimization we pass the row type 
separately.
     * @param tpe         The [[TypeInformation]] of the resulting 
[[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A]
-      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] 
= {
+  protected def translate[A](
+      logicalPlan: RelNode,
+      logicalType: RelDataType)
+      (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
     TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataStreamRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataStream[A]]
-      case _ => ???
+        val plan = node.translateToPlan(this)
+        val conversion = sinkConversion(plan.getType, logicalType, tpe, 
"DataStreamSinkConversion")
+        conversion match {
+          case None => plan.asInstanceOf[DataStream[A]] // no conversion 
necessary
+          case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+        }
+
+      case _ =>
+        throw TableException("Cannot generate DataStream due to an invalid 
logical plan. " +
+          "This is a bug and should not happen. Please file an issue.")
     }
   }
 
@@ -291,7 +302,9 @@ abstract class StreamTableEnvironment(
   def explain(table: Table): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataStream = 
translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataStream = translate[Row](
+      optimizedPlan,
+      ast.getRowType)(new GenericTypeInfo(classOf[Row]))
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index a8876a8..6448657 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,12 +37,6 @@ class TableConfig {
   private var nullCheck: Boolean = true
 
   /**
-    * Defines if efficient types (such as Tuple types or Atomic types)
-    * should be used within operators where possible.
-    */
-  private var efficientTypeUsage = false
-
-  /**
     * Defines the configuration of Calcite for Table API and SQL queries.
     */
   private var calciteConfig = CalciteConfig.DEFAULT
@@ -73,24 +67,6 @@ class TableConfig {
   }
 
   /**
-    * Returns the usage of efficient types. If enabled, efficient types (such 
as Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def getEfficientTypeUsage = efficientTypeUsage
-
-  /**
-    * Sets the usage of efficient types. If enabled, efficient types (such as 
Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
-    this.efficientTypeUsage = efficientTypeUsage
-  }
-
-  /**
     * Returns the current configuration of Calcite for Table API and SQL 
queries.
     */
   def getCalciteConfig: CalciteConfig = calciteConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bcff1fb..b36441a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,27 +31,30 @@ import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, 
RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
-import java.{BatchTableEnvironment => JavaBatchTableEnv, 
StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => 
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
 import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
 
@@ -410,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         }
         exprs.map {
           case UnresolvedFieldReference(name) => (0, name)
-          case _ => throw new TableException("Field reference expression 
expected.")
+          case _ => throw new TableException("Field reference expression 
requested.")
         }
       case t: TupleTypeInfo[A] =>
         exprs.zipWithIndex.map {
@@ -466,6 +469,123 @@ abstract class TableEnvironment(val config: TableConfig) {
     (fieldNames.toArray, fieldIndexes.toArray)
   }
 
+  /**
+    * Creates a final converter that maps the internal row type to external 
type.
+    *
+    * @param physicalRowTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. 
for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has 
to be a
+    *                     valid Java class identifier.
+    */
+  protected def sinkConversion[T](
+      physicalRowTypeInfo: TypeInformation[Row],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[T],
+      functionName: String)
+    : Option[MapFunction[Row, T]] = {
+
+    // validate that at least the field types of physical and logical type 
match
+    // we do that here to make sure that plan translation was correct
+    val logicalRowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
+    if (physicalRowTypeInfo != logicalRowTypeInfo) {
+      throw TableException("The field types of physical and logical row types 
do not match." +
+        "This is a bug and should not happen. Please file an issue.")
+    }
+
+    // requested type is a generic Row, no conversion needed
+    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
+          requestedTypeInfo.getTypeClass == classOf[Row]) {
+      return None
+    }
+
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { 
relDataType =>
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
+    }
+    // field names
+    val logicalFieldNames = logicalRowType.getFieldNames.asScala
+
+    // validate requested type
+    if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+      throw new TableException("Arity of result does not match requested 
type.")
+    }
+    requestedTypeInfo match {
+
+      // POJO type requested
+      case pt: PojoTypeInfo[_] =>
+        logicalFieldNames.zip(logicalFieldTypes) foreach {
+          case (fName, fType) =>
+            val pojoIdx = pt.getFieldIndex(fName)
+            if (pojoIdx < 0) {
+              throw new TableException(s"POJO does not define field name: 
$fName")
+            }
+            val requestedTypeInfo = pt.getTypeAt(pojoIdx)
+            if (fType != requestedTypeInfo) {
+              throw new TableException(s"Result field does not match requested 
type. " +
+                s"requested: $requestedTypeInfo; Actual: $fType")
+            }
+        }
+
+      // Tuple/Case class/Row type requested
+      case tt: TupleTypeInfoBase[_] =>
+        logicalFieldTypes.zipWithIndex foreach {
+          case (fieldTypeInfo, i) =>
+            val requestedTypeInfo = tt.getTypeAt(i)
+            if (fieldTypeInfo != requestedTypeInfo) {
+              throw new TableException(s"Result field does not match requested 
type. " +
+                s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
+            }
+        }
+
+      // Atomic type requested
+      case at: AtomicType[_] =>
+        if (logicalFieldTypes.size != 1) {
+          throw new TableException(s"Requested result type is an atomic type 
but " +
+            s"result has more or less than a single field.")
+        }
+        val fieldTypeInfo = logicalFieldTypes.head
+        if (fieldTypeInfo != at) {
+          throw new TableException(s"Result field does not match requested 
type. " +
+            s"Requested: $at; Actual: $fieldTypeInfo")
+        }
+
+      case _ =>
+        throw new TableException(s"Unsupported result type: 
$requestedTypeInfo")
+    }
+
+    // code generate MapFunction
+    val generator = new CodeGenerator(
+      config,
+      false,
+      physicalRowTypeInfo,
+      None,
+      None)
+
+    val conversion = generator.generateConverterResultExpression(
+      requestedTypeInfo,
+      logicalFieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      functionName,
+      classOf[MapFunction[Row, T]],
+      body,
+      requestedTypeInfo)
+
+    val mapFunction = new MapRunner[Row, T](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+    Some(mapFunction)
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index f3e2f91..251be14 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.schema.{CompositeRelDataType, 
GenericRelDataType}
@@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
 import org.apache.flink.table.plan.schema.ArrayRelDataType
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.types.Row
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Flink specific type factory that represents the interface between Flink's 
[[TypeInformation]]
@@ -167,6 +169,19 @@ object FlinkTypeFactory {
         throw TableException(s"Type is not supported: $t")
   }
 
+  /**
+    * Converts a Calcite logical record into a Flink type information.
+    */
+  def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] 
= {
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList.asScala map { 
relDataType =>
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
+    }
+    // field names
+    val logicalFieldNames = logicalRowType.getFieldNames.asScala
+    new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
+  }
+
   def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = 
relDataType.getSqlTypeName match {
     case BOOLEAN => BOOLEAN_TYPE_INFO
     case TINYINT => BYTE_TYPE_INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d49d7a0..c679bd8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.runtime.TableFunctionCollector
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -62,8 +62,8 @@ import scala.collection.mutable
 class CodeGenerator(
    config: TableConfig,
    nullableInput: Boolean,
-   input1: TypeInformation[Any],
-   input2: Option[TypeInformation[Any]] = None,
+   input1: TypeInformation[_ <: Any],
+   input2: Option[TypeInformation[_ <: Any]] = None,
    input1PojoFieldMapping: Option[Array[Int]] = None,
    input2PojoFieldMapping: Option[Array[Int]] = None)
   extends RexVisitor[GeneratedExpression] {
@@ -112,7 +112,7 @@ class CodeGenerator(
     * @param config configuration that determines runtime behavior
     */
   def this(config: TableConfig) =
-    this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
+    this(config, false, new RowTypeInfo(), None, None)
 
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
@@ -224,15 +224,16 @@ class CodeGenerator(
     * @param bodyCode code contents of the SAM (Single Abstract Method). 
Inputs, collector, or
     *                 output record can be accessed via the given term methods.
     * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
+    * @tparam F Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
     * @return instance of GeneratedFunction
     */
-  def generateFunction[T <: Function](
+  def generateFunction[F <: Function, T <: Any](
       name: String,
-      clazz: Class[T],
+      clazz: Class[F],
       bodyCode: String,
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[T] = {
+      returnType: TypeInformation[T])
+    : GeneratedFunction[F, T] = {
     val funcName = newName(name)
 
     // Janino does not support generics, that's why we need
@@ -298,14 +299,14 @@ class CodeGenerator(
     *             valid Java class identifier.
     * @param records code for creating records
     * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
     * @return instance of GeneratedFunction
     */
-  def generateValuesInputFormat[T](
+  def generateValuesInputFormat[T <: Row](
       name: String,
       records: Seq[String],
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[GenericInputFormat[T]] = {
+      returnType: TypeInformation[T])
+    : GeneratedInput[GenericInputFormat[T], T] = {
     val funcName = newName(name)
 
     addReusableOutRecord(returnType)
@@ -343,7 +344,7 @@ class CodeGenerator(
       }
     """.stripMargin
 
-    GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+    GeneratedInput(funcName, returnType, funcCode)
   }
 
   /**
@@ -1094,7 +1095,7 @@ class CodeGenerator(
   // 
----------------------------------------------------------------------------------------------
 
   private def generateInputAccess(
-      inputType: TypeInformation[Any],
+      inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
       pojoFieldMapping: Option[Array[Int]])
@@ -1122,7 +1123,7 @@ class CodeGenerator(
   }
 
   private def generateNullableInputFieldAccess(
-      inputType: TypeInformation[Any],
+      inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int,
       pojoFieldMapping: Option[Array[Int]])

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 94007de..0f1de21 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
@@ -39,7 +38,7 @@ import scala.collection.JavaConverters._
 class ExpressionReducer(config: TableConfig)
   extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
 
-  private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+  private val EMPTY_ROW_INFO = new RowTypeInfo()
   private val EMPTY_ROW = new Row(0)
 
   override def reduce(
@@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig)
       resultType.getFieldNames,
       literals)
 
-    val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+    val generatedFunction = generator.generateFunction[MapFunction[Row, Row], 
Row](
       "ExpressionReducer",
       classOf[MapFunction[Row, Row]],
       s"""
         |${result.code}
         |return ${result.resultTerm};
         |""".stripMargin,
-      resultType.asInstanceOf[TypeInformation[Any]])
+      resultType)
 
     val clazz = compile(getClass.getClassLoader, generatedFunction.name, 
generatedFunction.code)
     val function = clazz.newInstance()

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
index b4c293d..271f686 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.codegen
 
+import org.apache.flink.api.common.functions
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 /**
@@ -41,14 +44,32 @@ object GeneratedExpression {
 }
 
 /**
-  * Describes a generated [[org.apache.flink.api.common.functions.Function]]
+  * Describes a generated [[functions.Function]]
   *
   * @param name class name of the generated Function.
   * @param returnType the type information of the result type
   * @param code code of the generated Function.
+  * @tparam F type of function
   * @tparam T type of function
   */
-case class GeneratedFunction[T](name: String, returnType: 
TypeInformation[Any], code: String)
+case class GeneratedFunction[F <: Function, T <: Any](
+  name: String,
+  returnType: TypeInformation[T],
+  code: String)
+
+/**
+  * Describes a generated [[InputFormat]].
+  *
+  * @param name class name of the generated input function.
+  * @param returnType the type information of the result type
+  * @param code code of the generated Function.
+  * @tparam F type of function
+  * @tparam T type of function
+  */
+case class GeneratedInput[F <: InputFormat[_, _], T <: Any](
+  name: String,
+  returnType: TypeInformation[T],
+  code: String)
 
 /**
   * Describes a generated [[org.apache.flink.util.Collector]].

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 3ba0285..20f810a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -22,14 +22,13 @@ import java.util
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.CorrelationId
-import org.apache.calcite.rel.logical.{LogicalProject, 
LogicalTableFunctionScan}
+import org.apache.calcite.rel.core.{CorrelationId, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table._
 import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, 
UnresolvedException}
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.expressions._
@@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.table.typeutils.TypeConverter
 import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
 
 import scala.collection.JavaConverters._
@@ -426,11 +424,18 @@ case class Join(
     }
 
     relBuilder.join(
-      TypeConverter.flinkJoinTypeToRelType(joinType),
+      convertJoinType(joinType),
       
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
       corSet.asJava)
   }
 
+  private def convertJoinType(joinType: JoinType) = joinType match {
+    case JoinType.INNER => JoinRelType.INNER
+    case JoinType.LEFT_OUTER => JoinRelType.LEFT
+    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+    case JoinType.FULL_OUTER => JoinRelType.FULL
+  }
+
   private def ambiguousName: Set[String] =
     left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
 
@@ -481,13 +486,12 @@ case class Join(
         if (checkIfFilterCondition(x)) {
           localPredicateFound = true
         }
-      case x: BinaryComparison => {
+      case x: BinaryComparison =>
         if (checkIfFilterCondition(x)) {
           localPredicateFound = true
         } else {
           nonEquiJoinPredicateFound = true
         }
-      }
       case x => failValidation(
         s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: 
$x")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
new file mode 100644
index 0000000..3883b14
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+
+import scala.collection.JavaConverters._
+
+trait CommonAggregate {
+
+  private[flink] def groupingToString(inputType: RelDataType, grouping: 
Array[Int]): String = {
+
+    val inFields = inputType.getFieldNames.asScala
+    grouping.map( inFields(_) ).mkString(", ")
+  }
+
+  private[flink] def aggregationToString(
+      inputType: RelDataType,
+      grouping: Array[Int],
+      rowType: RelDataType,
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      namedProperties: Seq[NamedWindowProperty])
+    : String = {
+
+    val inFields = inputType.getFieldNames.asScala
+    val outFields = rowType.getFieldNames.asScala
+
+    val groupStrings = grouping.map( inFields(_) )
+
+    val aggs = namedAggregates.map(_.getKey)
+    val aggStrings = aggs.map( a => s"${a.getAggregation}(${
+      if (a.getArgList.size() > 0) {
+        inFields(a.getArgList.get(0))
+      } else {
+        "*"
+      }
+    })")
+
+    val propStrings = namedProperties.map(_.property.toString)
+
+    (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
+      case (f, o) => if (f == o) {
+        f
+      } else {
+        s"$f AS $o"
+      }
+    }.mkString(", ")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
new file mode 100644
index 0000000..3f46258
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait CommonCalc {
+
+  private[flink] def functionBody(
+      generator: CodeGenerator,
+      inputType: TypeInformation[Row],
+      rowType: RelDataType,
+      calcProgram: RexProgram,
+      config: TableConfig)
+    : String = {
+
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+    val condition = calcProgram.getCondition
+    val expandedExpressions = calcProgram.getProjectList.map(
+      expr => calcProgram.expandLocalRef(expr))
+    val projection = generator.generateResultExpression(
+      returnType,
+      rowType.getFieldNames,
+      expandedExpressions)
+
+    // only projection
+    if (condition == null) {
+      s"""
+        |${projection.code}
+        |${generator.collectorTerm}.collect(${projection.resultTerm});
+        |""".stripMargin
+    }
+    else {
+      val filterCondition = generator.generateExpression(
+        calcProgram.expandLocalRef(calcProgram.getCondition))
+      // only filter
+      if (projection == null) {
+        s"""
+          |${filterCondition.code}
+          |if (${filterCondition.resultTerm}) {
+          |  ${generator.collectorTerm}.collect(${generator.input1Term});
+          |}
+          |""".stripMargin
+      }
+      // both filter and projection
+      else {
+        s"""
+          |${filterCondition.code}
+          |if (${filterCondition.resultTerm}) {
+          |  ${projection.code}
+          |  ${generator.collectorTerm}.collect(${projection.resultTerm});
+          |}
+          |""".stripMargin
+      }
+    }
+  }
+
+  private[flink] def calcMapFunction(
+      genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
+    : RichFlatMapFunction[Row, Row] = {
+
+    new FlatMapRunner[Row, Row](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+  }
+
+  private[flink] def conditionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
+
+    val cond = calcProgram.getCondition
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+
+    if (cond != null) {
+      expression(cond, inFields, Some(localExprs))
+    } else {
+      ""
+    }
+  }
+
+  private[flink] def selectionToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
+
+    val proj = calcProgram.getProjectList.asScala.toList
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
+
+    proj
+      .map(expression(_, inFields, Some(localExprs)))
+      .zip(outFields).map { case (e, o) =>
+        if (e != o) {
+          e + " AS " + o
+        } else {
+          e
+        }
+    }.mkString(", ")
+  }
+
+  private[flink] def calcOpName(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val conditionStr = conditionToString(calcProgram, expression)
+    val selectionStr = selectionToString(calcProgram, expression)
+
+    s"${if (calcProgram.getCondition != null) {
+      s"where: ($conditionStr), "
+    } else {
+      ""
+    }}select: ($selectionStr)"
+  }
+
+  private[flink] def calcToString(
+      calcProgram: RexProgram,
+      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+    val name = calcOpName(calcProgram, expression)
+    s"Calc($name)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
new file mode 100644
index 0000000..61b7ffb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, 
NO_CODE}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, 
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, 
TableFunctionCollector}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * Join a user-defined table function
+  */
+trait CommonCorrelate {
+
+  /**
+    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
+    * and user-defined table function.
+    */
+  private[flink] def correlateMapFunction(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      rowType: RelDataType,
+      joinType: SemiJoinType,
+      rexCall: RexCall,
+      condition: Option[RexNode],
+      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field 
mapping
+      ruleDescription: String)
+    : CorrelateFlatMapRunner[Row, Row] = {
+
+    val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+    val flatMap = generateFunction(
+      config,
+      inputTypeInfo,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      joinType,
+      rexCall,
+      pojoFieldMapping,
+      ruleDescription)
+
+    val collector = generateCollector(
+      config,
+      inputTypeInfo,
+      udtfTypeInfo,
+      returnType,
+      rowType,
+      condition,
+      pojoFieldMapping)
+
+    new CorrelateFlatMapRunner[Row, Row](
+      flatMap.name,
+      flatMap.code,
+      collector.name,
+      collector.code,
+      flatMap.returnType)
+
+  }
+
+  /**
+    * Generates the flat map function to run the user-defined table function.
+    */
+  private def generateFunction(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      returnType: TypeInformation[Row],
+      rowType: RelDataType,
+      joinType: SemiJoinType,
+      rexCall: RexCall,
+      pojoFieldMapping: Option[Array[Int]],
+      ruleDescription: String)
+    : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+
+    val functionGenerator = new CodeGenerator(
+      config,
+      false,
+      inputTypeInfo,
+      Some(udtfTypeInfo),
+      None,
+      pojoFieldMapping)
+
+    val (input1AccessExprs, input2AccessExprs) = 
functionGenerator.generateCorrelateAccessExprs
+
+    val collectorTerm = functionGenerator
+      .addReusableConstructor(classOf[TableFunctionCollector[_]])
+      .head
+
+    val call = functionGenerator.generateExpression(rexCall)
+    var body =
+      s"""
+        |${call.resultTerm}.setCollector($collectorTerm);
+        |${call.code}
+        |""".stripMargin
+
+    if (joinType == SemiJoinType.LEFT) {
+      // left outer join
+
+      // in case of left outer join and the returned row of table function is 
empty,
+      // fill all fields of row with null
+      val input2NullExprs = input2AccessExprs.map { x =>
+        GeneratedExpression(
+          primitiveDefaultValue(x.resultType),
+          ALWAYS_NULL,
+          NO_CODE,
+          x.resultType)
+      }
+      val outerResultExpr = functionGenerator.generateResultExpression(
+        input1AccessExprs ++ input2NullExprs, returnType, 
rowType.getFieldNames.asScala)
+      body +=
+        s"""
+          |boolean hasOutput = $collectorTerm.isCollected();
+          |if (!hasOutput) {
+          |  ${outerResultExpr.code}
+          |  
${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+          |}
+          |""".stripMargin
+    } else if (joinType != SemiJoinType.INNER) {
+      throw TableException(s"Unsupported SemiJoinType: $joinType for correlate 
join.")
+    }
+
+    functionGenerator.generateFunction(
+      ruleDescription,
+      classOf[FlatMapFunction[Row, Row]],
+      body,
+      returnType)
+  }
+
+  /**
+    * Generates table function collector.
+    */
+  private[flink] def generateCollector(
+      config: TableConfig,
+      inputTypeInfo: TypeInformation[Row],
+      udtfTypeInfo: TypeInformation[Any],
+      returnType: TypeInformation[Row],
+      rowType: RelDataType,
+      condition: Option[RexNode],
+      pojoFieldMapping: Option[Array[Int]])
+    : GeneratedCollector = {
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputTypeInfo,
+      Some(udtfTypeInfo),
+      None,
+      pojoFieldMapping)
+
+    val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
+
+    val crossResultExpr = generator.generateResultExpression(
+      input1AccessExprs ++ input2AccessExprs,
+      returnType,
+      rowType.getFieldNames.asScala)
+
+    val collectorCode = if (condition.isEmpty) {
+      s"""
+        |${crossResultExpr.code}
+        |getCollector().collect(${crossResultExpr.resultTerm});
+        |""".stripMargin
+    } else {
+      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+      filterGenerator.input1Term = filterGenerator.input2Term
+      val filterCondition = filterGenerator.generateExpression(condition.get)
+      s"""
+        |${filterGenerator.reuseInputUnboxingCode()}
+        |${filterCondition.code}
+        |if (${filterCondition.resultTerm}) {
+        |  ${crossResultExpr.code}
+        |  getCollector().collect(${crossResultExpr.resultTerm});
+        |}
+        |""".stripMargin
+    }
+
+    generator.generateTableFunctionCollector(
+      "TableFunctionCollector",
+      collectorCode,
+      udtfTypeInfo)
+  }
+
+  private[flink] def selectToString(rowType: RelDataType): String = {
+    rowType.getFieldNames.asScala.mkString(",")
+  }
+
+  private[flink] def correlateOpName(
+      rexCall: RexCall,
+      sqlFunction: TableSqlFunction,
+      rowType: RelDataType)
+    : String = {
+
+    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
+  }
+
+  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
+    val udtfName = sqlFunction.getName
+    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+    s"table($udtfName($operands))"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
new file mode 100644
index 0000000..274b602
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.types.Row
+
+/**
+  * Common class for batch and stream scans.
+  */
+trait CommonScan {
+
+  /**
+    * We check if the input type is exactly the same as the internal row type.
+    * A conversion is necessary if types differ.
+    */
+  private[flink] def needsConversion(
+      externalTypeInfo: TypeInformation[Any],
+      internalTypeInfo: TypeInformation[Row])
+    : Boolean = {
+
+    externalTypeInfo != internalTypeInfo
+  }
+
+  private[flink] def getConversionMapper(
+      config: TableConfig,
+      inputType: TypeInformation[Any],
+      expectedType: TypeInformation[Row],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputPojoFieldMapping: Option[Array[Int]] = None)
+    : MapFunction[Any, Row] = {
+
+    val generator = new CodeGenerator(
+      config,
+      false,
+      inputType,
+      None,
+      inputPojoFieldMapping)
+    val conversion = generator.generateConverterResultExpression(expectedType, 
fieldNames)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateFunction(
+      conversionOperatorName,
+      classOf[MapFunction[Any, Row]],
+      body,
+      expectedType)
+
+    new MapRunner[Any, Row](
+      genFunction.name,
+      genFunction.code,
+      genFunction.returnType)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
deleted file mode 100644
index 7290594..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-
-import scala.collection.JavaConverters._
-
-trait FlinkAggregate {
-
-  private[flink] def groupingToString(inputType: RelDataType, grouping: 
Array[Int]): String = {
-
-    val inFields = inputType.getFieldNames.asScala
-    grouping.map( inFields(_) ).mkString(", ")
-  }
-
-  private[flink] def aggregationToString(
-      inputType: RelDataType,
-      grouping: Array[Int],
-      rowType: RelDataType,
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      namedProperties: Seq[NamedWindowProperty])
-    : String = {
-
-    val inFields = inputType.getFieldNames.asScala
-    val outFields = rowType.getFieldNames.asScala
-
-    val groupStrings = grouping.map( inFields(_) )
-
-    val aggs = namedAggregates.map(_.getKey)
-    val aggStrings = aggs.map( a => s"${a.getAggregation}(${
-      if (a.getArgList.size() > 0) {
-        inFields(a.getArgList.get(0))
-      } else {
-        "*"
-      }
-    })")
-
-    val propStrings = namedProperties.map(_.property.toString)
-
-    (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
-      case (f, o) => if (f == o) {
-        f
-      } else {
-        s"$f AS $o"
-      }
-    }.mkString(", ")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
deleted file mode 100644
index 5ebd3ee..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
-import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
-import org.apache.flink.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-trait FlinkCalc {
-
-  private[flink] def functionBody(
-    generator: CodeGenerator,
-    inputType: TypeInformation[Any],
-    rowType: RelDataType,
-    calcProgram: RexProgram,
-    config: TableConfig,
-    expectedType: Option[TypeInformation[Any]]): String = {
-
-    val returnType = determineReturnType(
-      rowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val condition = calcProgram.getCondition
-    val expandedExpressions = calcProgram.getProjectList.map(
-      expr => calcProgram.expandLocalRef(expr))
-    val projection = generator.generateResultExpression(
-      returnType,
-      rowType.getFieldNames,
-      expandedExpressions)
-
-      // only projection
-      if (condition == null) {
-        s"""
-          |${projection.code}
-          |${generator.collectorTerm}.collect(${projection.resultTerm});
-          |""".stripMargin
-      }
-      else {
-        val filterCondition = generator.generateExpression(
-          calcProgram.expandLocalRef(calcProgram.getCondition))
-        // only filter
-        if (projection == null) {
-          // conversion
-          if (inputType != returnType) {
-            val conversion = generator.generateConverterResultExpression(
-              returnType,
-              rowType.getFieldNames)
-
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${conversion.code}
-              |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
-              |}
-              |""".stripMargin
-          }
-          // no conversion
-          else {
-            s"""
-              |${filterCondition.code}
-              |if (${filterCondition.resultTerm}) {
-              |  ${generator.collectorTerm}.collect(${generator.input1Term});
-              |}
-              |""".stripMargin
-          }
-        }
-        // both filter and projection
-        else {
-          s"""
-            |${filterCondition.code}
-            |if (${filterCondition.resultTerm}) {
-            |  ${projection.code}
-            |  ${generator.collectorTerm}.collect(${projection.resultTerm});
-            |}
-            |""".stripMargin
-        }
-      }
-    }
-
-  private[flink] def calcMapFunction(
-      genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): 
RichFlatMapFunction[Any, Any] = {
-
-    new FlatMapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-  }
-
-  private[flink] def conditionToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
-
-    val cond = calcProgram.getCondition
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-
-    if (cond != null) {
-      expression(cond, inFields, Some(localExprs))
-    } else {
-      ""
-    }
-  }
-
-  private[flink] def selectionToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String): 
String = {
-
-    val proj = calcProgram.getProjectList.asScala.toList
-    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
-    val localExprs = calcProgram.getExprList.asScala.toList
-    val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
-
-    proj
-      .map(expression(_, inFields, Some(localExprs)))
-      .zip(outFields).map { case (e, o) => {
-      if (e != o) {
-        e + " AS " + o
-      } else {
-        e
-      }
-    }
-    }.mkString(", ")
-  }
-
-  private[flink] def calcOpName(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
-    val conditionStr = conditionToString(calcProgram, expression)
-    val selectionStr = selectionToString(calcProgram, expression)
-
-    s"${if (calcProgram.getCondition != null) {
-      s"where: ($conditionStr), "
-    } else {
-      ""
-    }}select: ($selectionStr)"
-  }
-
-  private[flink] def calcToString(
-      calcProgram: RexProgram,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
-    val name = calcOpName(calcProgram, expression)
-    s"Calc($name)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
deleted file mode 100644
index c986602..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, 
GeneratedExpression, GeneratedFunction}
-import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
-import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, 
NO_CODE}
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, 
TableFunctionCollector}
-import org.apache.flink.table.typeutils.TypeConverter._
-import org.apache.flink.table.api.{TableConfig, TableException}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Join a user-defined table function
-  */
-trait FlinkCorrelate {
-
-  /**
-    * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
-    * and user-defined table function.
-    */
-  private[flink] def correlateMapFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      condition: Option[RexNode],
-      expectedType: Option[TypeInformation[Any]],
-      pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field 
mapping
-      ruleDescription: String)
-    : CorrelateFlatMapRunner[Any, Any] = {
-
-    val returnType = determineReturnType(
-      rowType,
-      expectedType,
-      config.getNullCheck,
-      config.getEfficientTypeUsage)
-
-    val flatMap = generateFunction(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      joinType,
-      rexCall,
-      pojoFieldMapping,
-      ruleDescription)
-
-    val collector = generateCollector(
-      config,
-      inputTypeInfo,
-      udtfTypeInfo,
-      returnType,
-      rowType,
-      condition,
-      pojoFieldMapping)
-
-    new CorrelateFlatMapRunner[Any, Any](
-      flatMap.name,
-      flatMap.code,
-      collector.name,
-      collector.code,
-      flatMap.returnType)
-
-  }
-
-  /**
-    * Generates the flat map function to run the user-defined table function.
-    */
-  private def generateFunction(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Any],
-      rowType: RelDataType,
-      joinType: SemiJoinType,
-      rexCall: RexCall,
-      pojoFieldMapping: Option[Array[Int]],
-      ruleDescription: String)
-    : GeneratedFunction[FlatMapFunction[Any, Any]] = {
-
-    val functionGenerator = new CodeGenerator(
-      config,
-      false,
-      inputTypeInfo,
-      Some(udtfTypeInfo),
-      None,
-      pojoFieldMapping)
-
-    val (input1AccessExprs, input2AccessExprs) = 
functionGenerator.generateCorrelateAccessExprs
-
-    val collectorTerm = functionGenerator
-      .addReusableConstructor(classOf[TableFunctionCollector[_]])
-      .head
-
-    val call = functionGenerator.generateExpression(rexCall)
-    var body =
-      s"""
-        |${call.resultTerm}.setCollector($collectorTerm);
-        |${call.code}
-        |""".stripMargin
-
-    if (joinType == SemiJoinType.LEFT) {
-      // left outer join
-
-      // in case of left outer join and the returned row of table function is 
empty,
-      // fill all fields of row with null
-      val input2NullExprs = input2AccessExprs.map { x =>
-        GeneratedExpression(
-          primitiveDefaultValue(x.resultType),
-          ALWAYS_NULL,
-          NO_CODE,
-          x.resultType)
-      }
-      val outerResultExpr = functionGenerator.generateResultExpression(
-        input1AccessExprs ++ input2NullExprs, returnType, 
rowType.getFieldNames.asScala)
-      body +=
-        s"""
-          |boolean hasOutput = $collectorTerm.isCollected();
-          |if (!hasOutput) {
-          |  ${outerResultExpr.code}
-          |  
${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
-          |}
-          |""".stripMargin
-    } else if (joinType != SemiJoinType.INNER) {
-      throw TableException(s"Unsupported SemiJoinType: $joinType for correlate 
join.")
-    }
-
-    functionGenerator.generateFunction(
-      ruleDescription,
-      classOf[FlatMapFunction[Any, Any]],
-      body,
-      returnType)
-  }
-
-  /**
-    * Generates table function collector.
-    */
-  private[flink] def generateCollector(
-      config: TableConfig,
-      inputTypeInfo: TypeInformation[Any],
-      udtfTypeInfo: TypeInformation[Any],
-      returnType: TypeInformation[Any],
-      rowType: RelDataType,
-      condition: Option[RexNode],
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedCollector = {
-
-    val generator = new CodeGenerator(
-      config,
-      false,
-      inputTypeInfo,
-      Some(udtfTypeInfo),
-      None,
-      pojoFieldMapping)
-
-    val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
-
-    val crossResultExpr = generator.generateResultExpression(
-      input1AccessExprs ++ input2AccessExprs,
-      returnType,
-      rowType.getFieldNames.asScala)
-
-    val collectorCode = if (condition.isEmpty) {
-      s"""
-        |${crossResultExpr.code}
-        |getCollector().collect(${crossResultExpr.resultTerm});
-        |""".stripMargin
-    } else {
-      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
-      filterGenerator.input1Term = filterGenerator.input2Term
-      val filterCondition = filterGenerator.generateExpression(condition.get)
-      s"""
-        |${filterGenerator.reuseInputUnboxingCode()}
-        |${filterCondition.code}
-        |if (${filterCondition.resultTerm}) {
-        |  ${crossResultExpr.code}
-        |  getCollector().collect(${crossResultExpr.resultTerm});
-        |}
-        |""".stripMargin
-    }
-
-    generator.generateTableFunctionCollector(
-      "TableFunctionCollector",
-      collectorCode,
-      udtfTypeInfo)
-  }
-
-  private[flink] def selectToString(rowType: RelDataType): String = {
-    rowType.getFieldNames.asScala.mkString(",")
-  }
-
-  private[flink] def correlateOpName(
-      rexCall: RexCall,
-      sqlFunction: TableSqlFunction,
-      rowType: RelDataType)
-    : String = {
-
-    s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: 
${selectToString(rowType)}"
-  }
-
-  private[flink] def correlateToString(rexCall: RexCall, sqlFunction: 
TableSqlFunction): String = {
-    val udtfName = sqlFunction.getName
-    val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
-    s"table($udtfName($operands))"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index a7765d1..7ad9bd5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -66,43 +66,6 @@ trait FlinkRel {
     }
   }
 
-  private[flink] def getConversionMapper(
-      config: TableConfig,
-      nullableInput: Boolean,
-      inputType: TypeInformation[Any],
-      expectedType: TypeInformation[Any],
-      conversionOperatorName: String,
-      fieldNames: Seq[String],
-      inputPojoFieldMapping: Option[Array[Int]] = None)
-    : MapFunction[Any, Any] = {
-
-    val generator = new CodeGenerator(
-      config,
-      nullableInput,
-      inputType,
-      None,
-      inputPojoFieldMapping)
-    val conversion = generator.generateConverterResultExpression(expectedType, 
fieldNames)
-
-    val body =
-      s"""
-         |${conversion.code}
-         |return ${conversion.resultTerm};
-         |""".stripMargin
-
-    val genFunction = generator.generateFunction(
-      conversionOperatorName,
-      classOf[MapFunction[Any, Any]],
-      body,
-      expectedType)
-
-    new MapRunner[Any, Any](
-      genFunction.name,
-      genFunction.code,
-      genFunction.returnType)
-
-  }
-
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
     val fieldList = rowType.getFieldList
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 252bb2e..09262a6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -36,6 +36,7 @@ abstract class BatchScan(
     traitSet: RelTraitSet,
     table: RelOptTable)
   extends TableScan(cluster, traitSet, table)
+  with CommonScan
   with DataSetRel {
 
   override def toString: String = {
@@ -48,50 +49,34 @@ abstract class BatchScan(
     planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
   }
 
-  protected def convertToExpectedType(
+  protected def convertToInternalRow(
       input: DataSet[Any],
       flinkTable: FlinkTable[_],
-      expectedType: Option[TypeInformation[Any]],
-      config: TableConfig): DataSet[Any] = {
+      config: TableConfig)
+    : DataSet[Row] = {
 
     val inputType = input.getType
 
-    expectedType match {
+    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-      // special case:
-      // if efficient type usage is enabled and no expected type is set
-      // we can simply forward the DataSet to the next operator.
-      // however, we cannot forward PojoTypes as their fields don't have an 
order
-      case None if config.getEfficientTypeUsage && 
!inputType.isInstanceOf[PojoTypeInfo[_]] =>
-        input
+    // conversion
+    if (needsConversion(inputType, internalType)) {
 
-      case _ =>
-        val determinedType = determineReturnType(
-          getRowType,
-          expectedType,
-          config.getNullCheck,
-          config.getEfficientTypeUsage)
+      val mapFunc = getConversionMapper(
+        config,
+        inputType,
+        internalType,
+        "DataSetSourceConversion",
+        getRowType.getFieldNames,
+        Some(flinkTable.fieldIndexes))
 
-        // conversion
-        if (determinedType != inputType) {
+      val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-          val mapFunc = getConversionMapper(
-            config,
-            nullableInput = false,
-            inputType,
-            determinedType,
-            "DataSetSourceConversion",
-            getRowType.getFieldNames,
-            Some(flinkTable.fieldIndexes))
-
-          val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
-          input.map(mapFunc).name(opName)
-        }
-        // no conversion necessary, forward
-        else {
-          input
-        }
+      input.map(mapFunc).name(opName)
+    }
+    // no conversion necessary, forward
+    else {
+      input.asInstanceOf[DataSet[Row]]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 73dddc6..9b8e1ea 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, 
TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a 
[[BatchTableSource]]. */
 class BatchTableSourceScan(
@@ -62,13 +63,11 @@ class BatchTableSourceScan(
       .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", 
"))
   }
 
-  override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] 
= {
 
     val config = tableEnv.getConfig
     val inputDataSet = 
tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
 
-    convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), 
expectedType, config)
+    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), 
config)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index 6771536..206e562 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.table.typeutils.TypeConverter
-import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConverters._
-
 /**
   * Flink RelNode which matches along with a LogicalAggregate.
   */
@@ -49,7 +45,7 @@ class DataSetAggregate(
     grouping: Array[Int],
     inGroupingSet: Boolean)
   extends SingleRel(cluster, traitSet, inputNode)
-  with FlinkAggregate
+  with CommonAggregate
   with DataSetRel {
 
   override def deriveRowType(): RelDataType = rowRelDataType
@@ -89,9 +85,7 @@ class DataSetAggregate(
     planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
   }
 
-  override def translateToPlan(
-    tableEnv: BatchTableEnvironment,
-    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+  override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] 
= {
 
     val config = tableEnv.getConfig
 
@@ -109,15 +103,7 @@ class DataSetAggregate(
       grouping,
       inGroupingSet)
 
-    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
-      tableEnv,
-      // tell the input operator that this operator currently only supports 
Rows as input
-      Some(TypeConverter.DEFAULT_ROW_TYPE))
-
-    // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
-    .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
-    .toArray
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val aggString = aggregationToString(inputType, grouping, getRowType, 
namedAggregates, Nil)
     val prepareOpName = s"prepare select: ($aggString)"
@@ -125,46 +111,26 @@ class DataSetAggregate(
       .map(mapFunction)
       .name(prepareOpName)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
-    val result = {
-      if (groupingKeys.length > 0) {
-        // grouped aggregation
-        val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), 
" +
-          s"select: ($aggString)"
-
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .groupBy(groupingKeys: _*)
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-      else {
-        // global aggregation
-        val aggOpName = s"select:($aggString)"
-        mappedInput.asInstanceOf[DataSet[Row]]
-          .reduceGroup(groupReduceFunction)
-          .returns(rowTypeInfo)
-          .name(aggOpName)
-          .asInstanceOf[DataSet[Any]]
-      }
-    }
+    val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+    if (groupingKeys.length > 0) {
+      // grouped aggregation
+      val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " 
+
+        s"select: ($aggString)"
 
-    // if the expected type is not a Row, inject a mapper to convert to the 
expected type
-    expectedType match {
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-        result.map(getConversionMapper(
-          config = config,
-          nullableInput = false,
-          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType = expectedType.get,
-          conversionOperatorName = "DataSetAggregateConversion",
-          fieldNames = getRowType.getFieldNames.asScala
-        ))
-        .name(mapName)
-      case _ => result
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .groupBy(groupingKeys: _*)
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .name(aggOpName)
+    }
+    else {
+      // global aggregation
+      val aggOpName = s"select:($aggString)"
+      mappedInput.asInstanceOf[DataSet[Row]]
+        .reduceGroup(groupReduceFunction)
+        .returns(rowTypeInfo)
+        .name(aggOpName)
     }
   }
 }

Reply via email to