[FLINK-7337] [table] Refactor internal handling of time indicator attributes.

- Expand phyiscal Row schema for time indicators.
- Refactor computation of logical schema of tables to import.
- Refactor operators to use time attribute in Row instead of StreamRecord 
timestamp.

This closes #4488.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93d0ae4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93d0ae4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93d0ae4a

Branch: refs/heads/master
Commit: 93d0ae4a9f059da4bd2b720f7503da0f9c0a8c7c
Parents: 6ed5815
Author: Fabian Hueske <fhue...@apache.org>
Authored: Fri Aug 4 02:20:56 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Aug 23 10:09:21 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 155 ++++-
 .../flink/table/api/TableEnvironment.scala      |   8 +-
 .../flink/table/calcite/FlinkTypeFactory.scala  |  37 +-
 .../calcite/RelTimeIndicatorConverter.scala     |  67 +--
 .../flink/table/codegen/CodeGenerator.scala     |  78 ++-
 .../table/codegen/calls/ScalarOperators.scala   |  44 +-
 .../table/functions/ProctimeSqlFunction.scala   |  41 ++
 .../TimeMaterializationSqlFunction.scala        |  41 --
 .../flink/table/plan/nodes/CommonCalc.scala     |   6 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  20 +-
 .../plan/nodes/PhysicalTableSourceScan.scala    |   4 +-
 .../nodes/dataset/BatchTableSourceScan.scala    |   4 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |  12 +-
 .../nodes/datastream/DataStreamCorrelate.scala  |   6 +-
 .../datastream/DataStreamGroupAggregate.scala   |  34 +-
 .../DataStreamGroupWindowAggregate.scala        |  84 +--
 .../datastream/DataStreamOverAggregate.scala    | 149 ++---
 .../plan/nodes/datastream/DataStreamScan.scala  |   2 +-
 .../plan/nodes/datastream/DataStreamSort.scala  |  20 +-
 .../plan/nodes/datastream/DataStreamUnion.scala |   6 +-
 .../nodes/datastream/DataStreamValues.scala     |  12 +-
 .../nodes/datastream/DataStreamWindowJoin.scala |  16 +-
 .../plan/nodes/datastream/StreamScan.scala      |  36 +-
 .../datastream/StreamTableSourceScan.scala      |  23 +-
 .../logical/FlinkLogicalTableSourceScan.scala   |  23 +-
 .../datastream/DataStreamWindowJoinRule.scala   |   2 +-
 .../table/plan/schema/DataStreamTable.scala     |  13 -
 .../flink/table/plan/schema/FlinkTable.scala    |  10 +-
 .../flink/table/plan/schema/RowSchema.scala     | 124 +---
 .../plan/schema/StreamTableSourceTable.scala    |  48 +-
 .../runtime/CRowInputTupleOutputMapRunner.scala |   2 +
 .../table/runtime/CRowOutputMapRunner.scala     |  60 --
 .../table/runtime/CRowOutputProcessRunner.scala |  72 +++
 .../TimestampSetterProcessFunction.scala        |  52 ++
 ...WrappingTimestampSetterProcessFunction.scala |  61 ++
 .../table/runtime/aggregate/AggregateUtil.scala |  54 +-
 ...SetSessionWindowAggReduceGroupFunction.scala |   5 +-
 ...taSetSlideWindowAggReduceGroupFunction.scala |   5 +-
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   2 +-
 ...rementalAggregateAllTimeWindowFunction.scala |  15 +-
 ...IncrementalAggregateTimeWindowFunction.scala |  19 +-
 .../aggregate/ProcTimeBoundedRangeOver.scala    |   4 +
 .../aggregate/ProcTimeSortProcessFunction.scala |   8 +-
 .../aggregate/RowTimeBoundedRangeOver.scala     |   9 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |   9 +-
 .../aggregate/RowTimeSortProcessFunction.scala  |  20 +-
 .../aggregate/RowTimeUnboundedOver.scala        |  19 +-
 .../table/runtime/aggregate/SortUtil.scala      |   5 +-
 .../aggregate/TimeWindowPropertyCollector.scala |  23 +-
 .../table/runtime/join/WindowJoinUtil.scala     |  11 +-
 .../table/typeutils/TimeIndicatorTypeInfo.scala |   3 +
 .../flink/table/api/stream/sql/SortTest.scala   |   4 +-
 .../api/stream/table/TableSourceTest.scala      |   4 +-
 .../plan/TimeIndicatorConversionTest.scala      |  29 +-
 .../table/runtime/harness/HarnessTestBase.scala |  22 +-
 .../table/runtime/harness/JoinHarnessTest.scala |   4 +-
 .../runtime/harness/NonWindowHarnessTest.scala  |   4 +-
 .../runtime/harness/OverWindowHarnessTest.scala | 586 ++++++++-----------
 .../SortProcessFunctionHarnessTest.scala        | 129 ++--
 .../runtime/stream/table/TableSinkITCase.scala  | 140 ++++-
 60 files changed, 1362 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 7328b2a..c4e1450 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,10 +23,8 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, 
RelDataTypeFieldImpl, RelRecordType}
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.functions.MapFunction
@@ -38,19 +36,19 @@ import 
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.RelTimeIndicatorConverter
+import org.apache.flink.table.calcite.{FlinkTypeFactory, 
RelTimeIndicatorConverter}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, 
UpdateAsRetractionTrait, _}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, 
UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, 
StreamTableSourceTable}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, 
CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
+import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, 
CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner, 
WrappingTimestampSetterProcessFunction}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, 
StreamTableSource, TableSource}
-import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
@@ -270,11 +268,11 @@ abstract class StreamTableEnvironment(
     *                     valid Java class identifier.
     */
   private def getConversionMapperWithChanges[OUT](
-    physicalTypeInfo: TypeInformation[CRow],
-    schema: RowSchema,
-    requestedTypeInfo: TypeInformation[OUT],
-    functionName: String):
-  MapFunction[CRow, OUT] = {
+      physicalTypeInfo: TypeInformation[CRow],
+      schema: RowSchema,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+    MapFunction[CRow, OUT] = {
 
     requestedTypeInfo match {
 
@@ -356,9 +354,7 @@ abstract class StreamTableEnvironment(
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
       fieldIndexes,
-      fieldNames,
-      None,
-      None
+      fieldNames
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -393,12 +389,14 @@ abstract class StreamTableEnvironment(
           s"But is: ${execEnv.getStreamTimeCharacteristic}")
     }
 
+    // adjust field indexes and field names
+    val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
+    val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
+
     val dataStreamTable = new DataStreamTable[T](
       dataStream,
-      fieldIndexes,
-      fieldNames,
-      rowtime,
-      proctime
+      indexesWithIndicatorFields,
+      namesWithIndicatorFields
     )
     registerTableInternal(name, dataStreamTable)
   }
@@ -502,6 +500,63 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Injects markers for time indicator fields into the field indexes.
+    *
+    * @param fieldIndexes The field indexes into which the time indicators 
markers are injected.
+    * @param rowtime An optional rowtime indicator
+    * @param proctime An optional proctime indicator
+    * @return An adjusted array of field indexes.
+    */
+  private def adjustFieldIndexes(
+    fieldIndexes: Array[Int],
+    rowtime: Option[(Int, String)],
+    proctime: Option[(Int, String)]): Array[Int] = {
+
+    // inject rowtime field
+    val withRowtime = rowtime match {
+      case Some(rt) => fieldIndexes.patch(rt._1, 
Seq(TimeIndicatorTypeInfo.ROWTIME_MARKER), 0)
+      case _ => fieldIndexes
+    }
+
+    // inject proctime field
+    val withProctime = proctime match {
+      case Some(pt) => withRowtime.patch(pt._1, 
Seq(TimeIndicatorTypeInfo.PROCTIME_MARKER), 0)
+      case _ => withRowtime
+    }
+
+    withProctime
+  }
+
+  /**
+    * Injects names of time indicator fields into the list of field names.
+    *
+    * @param fieldNames The array of field names into which the time indicator 
field names are
+    *                   injected.
+    * @param rowtime An optional rowtime indicator
+    * @param proctime An optional proctime indicator
+    * @return An adjusted array of field names.
+    */
+  private def adjustFieldNames(
+    fieldNames: Array[String],
+    rowtime: Option[(Int, String)],
+    proctime: Option[(Int, String)]): Array[String] = {
+
+    // inject rowtime field
+    val withRowtime = rowtime match {
+      case Some(rt) => fieldNames.patch(rt._1, Seq(rowtime.get._2), 0)
+      case _ => fieldNames
+    }
+
+    // inject proctime field
+    val withProctime = proctime match {
+      case Some(pt) => withRowtime.patch(pt._1, Seq(proctime.get._2), 0)
+      case _ => withRowtime
+    }
+
+    withProctime
+  }
+
+  /**
     * Returns the decoration rule set for this environment
     * including a custom RuleSet configuration.
     */
@@ -632,10 +687,21 @@ abstract class StreamTableEnvironment(
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
 
-    // we convert the logical row type to the output row type
-    val convertedOutputType = 
RelTimeIndicatorConverter.convertOutputType(relNode)
-
-    translate(dataStreamPlan, convertedOutputType, queryConfig, withChangeFlag)
+    // zip original field names with optimized field types
+    val fieldTypes = relNode.getRowType.getFieldList.asScala
+      .zip(dataStreamPlan.getRowType.getFieldList.asScala)
+      // get name of original plan and type of optimized plan
+      .map(x => (x._1.getName, x._2.getType))
+      // add field indexes
+      .zipWithIndex
+      // build new field types
+      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
+
+    // build a record type from list of field types
+    val rowType = new RelRecordType(
+      fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava)
+
+    translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
   }
 
   /**
@@ -684,12 +750,41 @@ abstract class StreamTableEnvironment(
 
     val rootParallelism = plan.getParallelism
 
-    conversion match {
-      case mapFunction: MapFunction[CRow, A] =>
-        plan.map(mapFunction)
-          .returns(tpe)
-          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
-          .setParallelism(rootParallelism)
+    val rowtimeFields = logicalType.getFieldList.asScala
+      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+    if (rowtimeFields.isEmpty) {
+      // no rowtime field to set
+      conversion match {
+        case mapFunction: MapFunction[CRow, A] =>
+          plan.map(mapFunction)
+            .returns(tpe)
+            .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+            .setParallelism(rootParallelism)
+      }
+    } else if (rowtimeFields.size == 1) {
+      // set the only rowtime field as event-time timestamp for DataStream
+      val mapFunction = conversion match {
+        case mapFunction: MapFunction[CRow, A] => mapFunction
+        case _ => new MapFunction[CRow, A] {
+          override def map(cRow: CRow): A = cRow.asInstanceOf[A]
+        }
+      }
+
+      plan.process(
+        new WrappingTimestampSetterProcessFunction[A](
+          mapFunction,
+          rowtimeFields.head.getIndex))
+        .returns(tpe)
+        .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+        .setParallelism(rootParallelism)
+
+    } else {
+      throw new TableException(
+        s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+          s"the table that should be converted to a DataStream.\n" +
+          s"Please select the rowtime field that should be used as event-time 
timestamp for the " +
+          s"DataStream by casting all other fields to TIMESTAMP.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 3bca156..b647c51 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -731,15 +731,15 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     // validate that at least the field types of physical and logical type 
match
     // we do that here to make sure that plan translation was correct
-    if (schema.physicalTypeInfo != inputTypeInfo) {
+    if (schema.typeInfo != inputTypeInfo) {
       throw TableException(
         s"The field types of physical and logical row types do not match. " +
-        s"Physical type is [${schema.physicalTypeInfo}], Logical type is 
[${inputTypeInfo}]. " +
+        s"Physical type is [${schema.typeInfo}], Logical type is 
[${inputTypeInfo}]. " +
         s"This is a bug and should not happen. Please file an issue.")
     }
 
-    val fieldTypes = schema.physicalFieldTypeInfo
-    val fieldNames = schema.physicalFieldNames
+    val fieldTypes = schema.fieldTypeInfos
+    val fieldNames = schema.fieldNames
 
     // validate requested type
     if (requestedTypeInfo.getArity != fieldTypes.length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index dbefe20..637e8cc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     *
     * @param fieldNames field names
     * @param fieldTypes field types, every element is Flink's 
[[TypeInformation]]
-    * @param rowtime optional system field to indicate event-time; the index 
determines the index
-    *                in the final record. If the index is smaller than the 
number of specified
-    *                fields, it shifts all following fields.
-    * @param proctime optional system field to indicate processing-time; the 
index determines the
-    *                 index in the final record. If the index is smaller than 
the number of
-    *                 specified fields, it shifts all following fields.
     * @return a struct type with the input fieldNames, input fieldTypes, and 
system fields
     */
   def buildLogicalRowType(
       fieldNames: Seq[String],
-      fieldTypes: Seq[TypeInformation[_]],
-      rowtime: Option[(Int, String)],
-      proctime: Option[(Int, String)])
+      fieldTypes: Seq[TypeInformation[_]])
     : RelDataType = {
     val logicalRowTypeBuilder = builder
 
     val fields = fieldNames.zip(fieldTypes)
-
-    var totalNumberOfFields = fields.length
-    if (rowtime.isDefined) {
-      totalNumberOfFields += 1
-    }
-    if (proctime.isDefined) {
-      totalNumberOfFields += 1
-    }
-
-    var addedTimeAttributes = 0
-    for (i <- 0 until totalNumberOfFields) {
-      if (rowtime.isDefined && rowtime.get._1 == i) {
-        logicalRowTypeBuilder.add(rowtime.get._2, createRowtimeIndicatorType())
-        addedTimeAttributes += 1
-      } else if (proctime.isDefined && proctime.get._1 == i) {
-        logicalRowTypeBuilder.add(proctime.get._2, 
createProctimeIndicatorType())
-        addedTimeAttributes += 1
-      } else {
-        val field = fields(i - addedTimeAttributes)
-        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, 
isNullable = true))
-      }
-    }
+    fields.foreach(f => {
+      // time indicators are not nullable
+      val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2)
+      logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable))
+    })
 
     logicalRowTypeBuilder.build
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index eb14291..717a1af 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.calcite
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, 
RelRecordType}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelNode, RelShuttle}
@@ -26,8 +26,8 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType
-import org.apache.flink.table.functions.TimeMaterializationSqlFunction
+import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, _}
+import org.apache.flink.table.functions.ProctimeSqlFunction
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 
@@ -242,9 +242,13 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
         case lp: LogicalProject =>
           val projects = lp.getProjects.zipWithIndex.map { case (expr, idx) =>
             if (isTimeIndicatorType(expr.getType) && refIndices.contains(idx)) 
{
-              rexBuilder.makeCall(
-                TimeMaterializationSqlFunction,
-                expr)
+              if (isRowtimeIndicatorType(expr.getType)) {
+                // cast rowtime indicator to regular timestamp
+                rexBuilder.makeAbstractCast(timestamp, expr)
+              } else {
+                // generate proctime access
+                rexBuilder.makeCall(ProctimeSqlFunction, expr)
+              }
             } else {
               expr
             }
@@ -259,9 +263,17 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
         case _ =>
           val projects = input.getRowType.getFieldList.map { field =>
             if (isTimeIndicatorType(field.getType) && 
refIndices.contains(field.getIndex)) {
-              rexBuilder.makeCall(
-                TimeMaterializationSqlFunction,
-                new RexInputRef(field.getIndex, field.getType))
+              if (isRowtimeIndicatorType(field.getType)) {
+                // cast rowtime indicator to regular timestamp
+                rexBuilder.makeAbstractCast(
+                  timestamp,
+                  new RexInputRef(field.getIndex, field.getType))
+              } else {
+                // generate proctime access
+                rexBuilder.makeCall(
+                  ProctimeSqlFunction,
+                  new RexInputRef(field.getIndex, field.getType))
+              }
             } else {
               new RexInputRef(field.getIndex, field.getType)
             }
@@ -311,19 +323,19 @@ object RelTimeIndicatorConverter {
 
     var needsConversion = false
 
-    // materialize all remaining time indicators
+    // materialize remaining proctime indicators
     val projects = convertedRoot.getRowType.getFieldList.map(field =>
-      if (isTimeIndicatorType(field.getType)) {
+      if (isProctimeIndicatorType(field.getType)) {
         needsConversion = true
         rexBuilder.makeCall(
-          TimeMaterializationSqlFunction,
+          ProctimeSqlFunction,
           new RexInputRef(field.getIndex, field.getType))
       } else {
         new RexInputRef(field.getIndex, field.getType)
       }
     )
 
-    // add final conversion
+    // add final conversion if necessary
     if (needsConversion) {
       LogicalProject.create(
       convertedRoot,
@@ -334,27 +346,6 @@ object RelTimeIndicatorConverter {
     }
   }
 
-  def convertOutputType(rootRel: RelNode): RelDataType = {
-
-    val timestamp = rootRel
-      .getCluster
-      .getRexBuilder
-      .getTypeFactory
-      .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
-
-    // convert all time indicators types to timestamps
-    val fields = rootRel.getRowType.getFieldList.map { field =>
-      if (isTimeIndicatorType(field.getType)) {
-        new RelDataTypeFieldImpl(field.getName, field.getIndex, timestamp)
-      } else {
-        field
-      }
-    }
-
-    new RelRecordType(fields)
-  }
-
   /**
     * Materializes time indicator accesses in an expression.
     *
@@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer(
       case _ =>
         updatedCall.getOperands.map { o =>
           if (isTimeIndicatorType(o.getType)) {
-            rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
+            if (isRowtimeIndicatorType(o.getType)) {
+              // cast rowtime indicator to regular timestamp
+              rexBuilder.makeAbstractCast(timestamp, o)
+            } else {
+              // generate proctime access
+              rexBuilder.makeCall(ProctimeSqlFunction, o)
+            }
           } else {
             o
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 28fea59..be55eac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -37,11 +37,12 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
+import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator}
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.functions.{FunctionContext, 
TimeMaterializationSqlFunction, UserDefinedFunction}
+import org.apache.flink.table.functions.{FunctionContext, ProctimeSqlFunction, 
UserDefinedFunction}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 
 import scala.collection.JavaConversions._
@@ -56,10 +57,11 @@ import scala.collection.mutable
   * @param nullableInput input(s) can be null.
   * @param input1 type information about the first input of the Function
   * @param input2 type information about the second input if the Function is 
binary
-  * @param input1FieldMapping additional mapping information for input1
-  *   (e.g. POJO types have no deterministic field order and some input fields 
might not be read)
-  * @param input2FieldMapping additional mapping information for input2
-  *   (e.g. POJO types have no deterministic field order and some input fields 
might not be read)
+  * @param input1FieldMapping additional mapping information for input1.
+  *   POJO types have no deterministic field order and some input fields might 
not be read.
+  *   The input1FieldMapping is also used to inject time indicator attributes.
+  * @param input2FieldMapping additional mapping information for input2.
+  *   POJO types have no deterministic field order and some input fields might 
not be read.
   */
 abstract class CodeGenerator(
     config: TableConfig,
@@ -245,16 +247,23 @@ abstract class CodeGenerator(
       returnType: TypeInformation[_ <: Any],
       resultFieldNames: Seq[String])
     : GeneratedExpression = {
-    val input1AccessExprs = input1Mapping.map { idx =>
-      generateInputAccess(input1, input1Term, idx)
+
+    val input1AccessExprs = input1Mapping.map {
+      case TimeIndicatorTypeInfo.ROWTIME_MARKER =>
+        // attribute is a rowtime indicator. Access event-time timestamp in 
StreamRecord.
+        generateRowtimeAccess()
+      case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
+        // attribute is proctime indicator.
+        // We use a null literal and generate a timestamp when we need it.
+        generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
+      case idx =>
+        // regular attribute. Access attribute in input data type.
+        generateInputAccess(input1, input1Term, idx)
     }
 
     val input2AccessExprs = input2 match {
       case Some(ti) =>
-        input2Mapping.map { idx =>
-          generateInputAccess(ti, input2Term, idx)
-        }.toSeq
-
+        input2Mapping.map(idx => generateInputAccess(ti, input2Term, 
idx)).toSeq
       case None => Seq() // add nothing
     }
 
@@ -724,10 +733,8 @@ abstract class CodeGenerator(
   override def visitCall(call: RexCall): GeneratedExpression = {
 
     // special case: time materialization
-    if (call.getOperator == TimeMaterializationSqlFunction) {
-      return generateRecordTimestamp(
-        
FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType)
-      )
+    if (call.getOperator == ProctimeSqlFunction) {
+      return generateProctimeTimestamp()
     }
 
     val resultType = FlinkTypeFactory.toTypeInfo(call.getType)
@@ -967,10 +974,10 @@ abstract class CodeGenerator(
         generateArrayElement(this, array)
 
       case ScalarSqlFunctions.CONCAT =>
-        generateConcat(BuiltInMethods.CONCAT, operands)
+        generateConcat(this.nullCheck, operands)
 
       case ScalarSqlFunctions.CONCAT_WS =>
-        generateConcat(BuiltInMethods.CONCAT_WS, operands)
+        generateConcatWs(operands)
 
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
@@ -1216,8 +1223,14 @@ abstract class CodeGenerator(
         |""".stripMargin
     } else if (nullCheck) {
       s"""
-        |$resultTypeTerm $resultTerm = $unboxedFieldCode;
         |boolean $nullTerm = $fieldTerm == null;
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = $unboxedFieldCode;
+        |}
         |""".stripMargin
     } else {
       s"""
@@ -1270,27 +1283,32 @@ abstract class CodeGenerator(
     }
   }
 
-  private[flink] def generateRecordTimestamp(isEventTime: Boolean): 
GeneratedExpression = {
+  private[flink] def generateRowtimeAccess(): GeneratedExpression = {
     val resultTerm = newName("result")
-    val resultTypeTerm = 
primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val nullTerm = newName("isNull")
 
-    val resultCode = if (isEventTime) {
+    val accessCode =
       s"""
-        |$resultTypeTerm $resultTerm;
-        |if ($contextTerm.timestamp() == null) {
+        |Long $resultTerm = $contextTerm.timestamp();
+        |if ($resultTerm == null) {
         |  throw new RuntimeException("Rowtime timestamp is null. Please make 
sure that a proper " +
         |    "TimestampAssigner is defined and the stream environment uses the 
EventTime time " +
         |    "characteristic.");
         |}
-        |else {
-        |  $resultTerm = $contextTerm.timestamp();
-        |}
-        |""".stripMargin
-    } else {
+        |boolean $nullTerm = false;
+       """.stripMargin
+
+    GeneratedExpression(resultTerm, nullTerm, accessCode, 
TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
+  }
+
+  private[flink] def generateProctimeTimestamp(): GeneratedExpression = {
+    val resultTerm = newName("result")
+    val resultTypeTerm = 
primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+
+    val resultCode =
       s"""
         |$resultTypeTerm $resultTerm = 
$contextTerm.timerService().currentProcessingTime();
         |""".stripMargin
-    }
     GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 1ab927d..01e9dff 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.table.codegen.calls
 
-import java.lang.reflect.Method
-
 import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
@@ -1026,14 +1024,48 @@ object ScalarOperators {
   }
 
   def generateConcat(
-      method: Method,
-      operands: Seq[GeneratedExpression]): GeneratedExpression = {
+      nullCheck: Boolean,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
 
-    generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) {
-      (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})"
+    generateCallIfArgsNotNull(nullCheck, STRING_TYPE_INFO, operands) {
+      (terms) =>s"${qualifyMethod(BuiltInMethods.CONCAT)}(${terms.mkString(", 
")})"
     }
   }
 
+  def generateConcatWs(operands: Seq[GeneratedExpression]): 
GeneratedExpression = {
+
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val defaultValue = primitiveDefaultValue(Types.STRING)
+
+    val tempTerms = operands.tail.map(_ => newName("temp"))
+
+    val operatorCode =
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |
+        |String $resultTerm;
+        |boolean $nullTerm;
+        |if (${operands.head.nullTerm}) {
+        |  $nullTerm = true;
+        |  $resultTerm = $defaultValue;
+        |} else {
+        |  ${operands.tail.zip(tempTerms).map {
+                case (o: GeneratedExpression, t: String) =>
+                  s"String $t;\n" +
+                  s"  if (${o.nullTerm}) $t = null; else $t = ${o.resultTerm};"
+              }.mkString("\n")
+            }
+        |  $nullTerm = false;
+        |  $resultTerm = ${qualifyMethod(BuiltInMethods.CONCAT_WS)}
+        |   (${operands.head.resultTerm}, ${tempTerms.mkString(", ")});
+        |}
+        |""".stripMargin
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, Types.STRING)
+  }
+
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
new file mode 100644
index 0000000..4fb0378
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.validate.SqlMonotonicity
+
+/**
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
+  */
+object ProctimeSqlFunction
+  extends SqlFunction(
+    "PROCTIME",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.TIMESTAMP),
+    SqlFunctionCategory.SYSTEM) {
+
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+    SqlMonotonicity.INCREASING
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
deleted file mode 100644
index d875026..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions
-
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.validate.SqlMonotonicity
-
-/**
-  * Function that materializes a time attribute to the metadata timestamp. 
After materialization
-  * the result can be used in regular arithmetical calculations.
-  */
-object TimeMaterializationSqlFunction
-  extends SqlFunction(
-    "TIME_MATERIALIZATION",
-    SqlKind.OTHER_FUNCTION,
-    ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
-    InferTypes.RETURN_TYPE,
-    OperandTypes.family(SqlTypeFamily.TIMESTAMP),
-    SqlFunctionCategory.SYSTEM) {
-
-  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
-
-  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
-    SqlMonotonicity.INCREASING
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 3e355ff..2f1871b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -42,8 +42,8 @@ trait CommonCalc {
     GeneratedFunction[T, Row] = {
 
     val projection = generator.generateResultExpression(
-      returnSchema.physicalTypeInfo,
-      returnSchema.physicalFieldNames,
+      returnSchema.typeInfo,
+      returnSchema.fieldNames,
       calcProjection)
 
     // only projection
@@ -80,7 +80,7 @@ trait CommonCalc {
       ruleDescription,
       functionClass,
       body,
-      returnSchema.physicalTypeInfo)
+      returnSchema.typeInfo)
   }
 
   private[flink] def conditionToString(

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 96aaf3e..7c01fde 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -53,12 +53,10 @@ trait CommonCorrelate {
     functionClass: Class[T]):
   GeneratedFunction[T, Row] = {
 
-    val physicalRexCall = inputSchema.mapRexNode(rexCall)
-
     val functionGenerator = new FunctionCodeGenerator(
       config,
       false,
-      inputSchema.physicalTypeInfo,
+      inputSchema.typeInfo,
       Some(udtfTypeInfo),
       None,
       pojoFieldMapping)
@@ -69,7 +67,7 @@ trait CommonCorrelate {
       .addReusableConstructor(classOf[TableFunctionCollector[_]])
       .head
 
-    val call = functionGenerator.generateExpression(physicalRexCall)
+    val call = functionGenerator.generateExpression(rexCall)
     var body =
       s"""
          |${call.resultTerm}.setCollector($collectorTerm);
@@ -90,8 +88,8 @@ trait CommonCorrelate {
       }
       val outerResultExpr = functionGenerator.generateResultExpression(
         input1AccessExprs ++ input2NullExprs,
-        returnSchema.physicalTypeInfo,
-        returnSchema.physicalFieldNames)
+        returnSchema.typeInfo,
+        returnSchema.fieldNames)
       body +=
         s"""
            |boolean hasOutput = $collectorTerm.isCollected();
@@ -108,7 +106,7 @@ trait CommonCorrelate {
       ruleDescription,
       functionClass,
       body,
-      returnSchema.physicalTypeInfo)
+      returnSchema.typeInfo)
   }
 
   /**
@@ -126,7 +124,7 @@ trait CommonCorrelate {
     val generator = new CollectorCodeGenerator(
       config,
       false,
-      inputSchema.physicalTypeInfo,
+      inputSchema.typeInfo,
       Some(udtfTypeInfo),
       None,
       pojoFieldMapping)
@@ -135,8 +133,8 @@ trait CommonCorrelate {
 
     val crossResultExpr = generator.generateResultExpression(
       input1AccessExprs ++ input2AccessExprs,
-      returnSchema.physicalTypeInfo,
-      returnSchema.physicalFieldNames)
+      returnSchema.typeInfo,
+      returnSchema.fieldNames)
 
     val collectorCode = if (condition.isEmpty) {
       s"""
@@ -148,7 +146,7 @@ trait CommonCorrelate {
       // adjust indicies of InputRefs to adhere to schema expected by generator
       val changeInputRefIndexShuttle = new RexShuttle {
         override def visitInputRef(inputRef: RexInputRef): RexNode = {
-          new RexInputRef(inputSchema.physicalArity + inputRef.getIndex, 
inputRef.getType)
+          new RexInputRef(inputSchema.arity + inputRef.getIndex, 
inputRef.getType)
         }
       }
       // Run generateExpression to add init statements (ScalarFunctions) of 
condition to generator.

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index dc7a0d6..5872d8c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -39,9 +39,7 @@ abstract class PhysicalTableSourceScan(
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     flinkTypeFactory.buildLogicalRowType(
       TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType),
-      None,
-      None)
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index fb291e4..74aac43 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -42,9 +42,7 @@ class BatchTableSourceScan(
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     flinkTypeFactory.buildLogicalRowType(
       TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType),
-      None,
-      None)
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 2e00330..45e6902 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -52,7 +52,7 @@ class DataStreamCalc(
   with CommonCalc
   with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: 
RexProgram): Calc = {
     new DataStreamCalc(
@@ -100,7 +100,7 @@ class DataStreamCalc(
     val condition = if (calcProgram.getCondition != null) {
       val materializedCondition = RelTimeIndicatorConverter.convertExpression(
         calcProgram.expandLocalRef(calcProgram.getCondition),
-        inputSchema.logicalType,
+        inputSchema.relDataType,
         cluster.getRexBuilder)
       Some(materializedCondition)
     } else {
@@ -110,12 +110,8 @@ class DataStreamCalc(
     // filter out time attributes
     val projection = calcProgram.getProjectList.asScala
       .map(calcProgram.expandLocalRef)
-      // time indicator fields must not be part of the code generation
-      .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
-      // update indices
-      .map(expr => inputSchema.mapRexNode(expr))
 
-    val generator = new FunctionCodeGenerator(config, false, 
inputSchema.physicalTypeInfo)
+    val generator = new FunctionCodeGenerator(config, false, 
inputSchema.typeInfo)
 
     val genFunction = generateFunction(
       generator,
@@ -132,7 +128,7 @@ class DataStreamCalc(
     val processFunc = new CRowProcessRunner(
       genFunction.name,
       genFunction.code,
-      CRowTypeInfo(schema.physicalTypeInfo))
+      CRowTypeInfo(schema.typeInfo))
 
     inputDataStream
       .process(processFunc)

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index b7165cd..18ab2a3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -50,7 +50,7 @@ class DataStreamCorrelate(
   with CommonCorrelate
   with DataStreamRel {
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType() = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamCorrelate(
@@ -78,7 +78,7 @@ class DataStreamCorrelate(
     super.explainTerms(pw)
       .item("invocation", scan.getCall)
       .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
-      .item("rowType", schema.logicalType)
+      .item("rowType", schema.relDataType)
       .item("joinType", joinType)
       .itemIf("condition", condition.orNull, condition.isDefined)
   }
@@ -130,7 +130,7 @@ class DataStreamCorrelate(
       .process(processFunc)
       // preserve input parallelism to ensure that acc and retract messages 
remain in order
       .setParallelism(inputParallelism)
-      .name(correlateOpName(rexCall, sqlFunction, schema.logicalType))
+      .name(correlateOpName(rexCall, sqlFunction, schema.relDataType))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 12694fc..590d9be 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -59,7 +59,7 @@ class DataStreamGroupAggregate(
 
   private val LOG = LoggerFactory.getLogger(this.getClass)
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType() = schema.relDataType
 
   override def needsUpdatesAsRetraction = true
 
@@ -83,20 +83,20 @@ class DataStreamGroupAggregate(
   override def toString: String = {
     s"Aggregate(${
       if (!groupings.isEmpty) {
-        s"groupBy: (${groupingToString(inputSchema.logicalType, groupings)}), "
+        s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), "
       } else {
         ""
       }
     }select:(${aggregationToString(
-      inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)}))"
+      inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))"
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
       .itemIf("groupBy", groupingToString(
-        inputSchema.logicalType, groupings), !groupings.isEmpty)
+        inputSchema.relDataType, groupings), !groupings.isEmpty)
       .item("select", aggregationToString(
-        inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
+        inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil))
   }
 
   override def translateToPlan(
@@ -112,37 +112,29 @@ class DataStreamGroupAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
 
-    val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
-      new CalcitePair[AggregateCall, String](
-        inputSchema.mapAggregateCall(namedAggregate.left),
-        namedAggregate.right)
-    }
-
-    val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
+    val outRowType = CRowTypeInfo(schema.typeInfo)
 
     val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
-      inputSchema.physicalTypeInfo)
+      inputSchema.typeInfo)
 
     val aggString = aggregationToString(
-      inputSchema.logicalType,
+      inputSchema.relDataType,
       groupings,
       getRowType,
       namedAggregates,
       Nil)
 
-    val keyedAggOpName = s"groupBy: 
(${groupingToString(inputSchema.logicalType, groupings)}), " +
+    val keyedAggOpName = s"groupBy: 
(${groupingToString(inputSchema.relDataType, groupings)}), " +
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"select: ($aggString)"
 
-    val physicalGrouping = groupings.map(inputSchema.mapIndex)
-
     val processFunction = AggregateUtil.createGroupAggregateFunction(
       generator,
-      physicalNamedAggregates,
-      inputSchema.logicalType,
-      inputSchema.physicalFieldTypeInfo,
+      namedAggregates,
+      inputSchema.relDataType,
+      inputSchema.fieldTypeInfos,
       groupings,
       queryConfig,
       DataStreamRetractionRules.isAccRetract(this),
@@ -150,7 +142,7 @@ class DataStreamGroupAggregate(
 
     val result: DataStream[CRow] =
     // grouped / keyed aggregation
-      if (physicalGrouping.nonEmpty) {
+      if (groupings.nonEmpty) {
         inputDS
         .keyBy(groupings: _*)
         .process(processFunction)

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c4ffdb1..ac63be1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -32,11 +32,13 @@ import 
org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
+import org.apache.flink.table.expressions.ResolvedFieldReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.schema.RowSchema
 import 
org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.runtime.TimestampSetterProcessFunction
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -58,7 +60,7 @@ class DataStreamGroupWindowAggregate(
 
   private val LOG = LoggerFactory.getLogger(this.getClass)
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def needsUpdatesAsRetraction = true
 
@@ -84,14 +86,14 @@ class DataStreamGroupWindowAggregate(
   override def toString: String = {
     s"Aggregate(${
       if (!grouping.isEmpty) {
-        s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), "
+        s"groupBy: (${groupingToString(inputSchema.relDataType, grouping)}), "
       } else {
         ""
       }
     }window: ($window), " +
       s"select: (${
         aggregationToString(
-          inputSchema.logicalType,
+          inputSchema.relDataType,
           grouping,
           getRowType,
           namedAggregates,
@@ -101,13 +103,13 @@ class DataStreamGroupWindowAggregate(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .itemIf("groupBy", groupingToString(inputSchema.logicalType, grouping), 
!grouping.isEmpty)
+      .itemIf("groupBy", groupingToString(inputSchema.relDataType, grouping), 
!grouping.isEmpty)
       .item("window", window)
       .item(
         "select", aggregationToString(
-          inputSchema.logicalType,
+          inputSchema.relDataType,
           grouping,
-          schema.logicalType,
+          schema.relDataType,
           namedAggregates,
           namedProperties))
   }
@@ -118,14 +120,6 @@ class DataStreamGroupWindowAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
 
-    val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
-      new CalcitePair[AggregateCall, String](
-        inputSchema.mapAggregateCall(namedAggregate.left),
-        namedAggregate.right)
-    }
-    val physicalNamedProperties = namedProperties
-      .filter(np => 
!FlinkTypeFactory.isTimeIndicatorType(np.property.resultType))
-
     val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input)
 
     if (inputIsAccRetract) {
@@ -148,16 +142,30 @@ class DataStreamGroupWindowAggregate(
         "state size. You may specify a retention time of 0 to not clean up the 
state.")
     }
 
-    val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
+    val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
+      // copy the window rowtime attribute into the StreamRecord timestamp 
field
+      val timeAttribute = 
window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
+      val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute)
+
+      inputDS
+        .process(
+          new TimestampSetterProcessFunction(timeIdx, 
CRowTypeInfo(inputSchema.typeInfo)))
+        .setParallelism(inputDS.getParallelism)
+        .name(s"time attribute: ($timeAttribute)")
+    } else {
+      inputDS
+    }
+
+    val outRowType = CRowTypeInfo(schema.typeInfo)
 
     val aggString = aggregationToString(
-      inputSchema.logicalType,
+      inputSchema.relDataType,
       grouping,
-      schema.logicalType,
+      schema.relDataType,
       namedAggregates,
       namedProperties)
 
-    val keyedAggOpName = s"groupBy: 
(${groupingToString(inputSchema.logicalType, grouping)}), " +
+    val keyedAggOpName = s"groupBy: 
(${groupingToString(inputSchema.relDataType, grouping)}), " +
       s"window: ($window), " +
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
@@ -165,24 +173,22 @@ class DataStreamGroupWindowAggregate(
     val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
-      inputSchema.physicalTypeInfo)
+      inputSchema.typeInfo)
 
     val needMerge = window match {
       case SessionGroupWindow(_, _, _) => true
       case _ => false
     }
-    val physicalGrouping = grouping.map(inputSchema.mapIndex)
-
     // grouped / keyed aggregation
-    if (physicalGrouping.length > 0) {
+    if (grouping.length > 0) {
       val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
         window,
-        physicalGrouping.length,
-        physicalNamedAggregates.size,
-        schema.physicalArity,
-        physicalNamedProperties)
+        grouping.length,
+        namedAggregates.size,
+        schema.arity,
+        namedProperties)
 
-      val keyedStream = inputDS.keyBy(physicalGrouping: _*)
+      val keyedStream = timestampedInput.keyBy(grouping: _*)
       val windowedStream =
         createKeyedWindowedStream(queryConfig, window, keyedStream)
           .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
@@ -190,11 +196,11 @@ class DataStreamGroupWindowAggregate(
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
-          physicalNamedAggregates,
-          inputSchema.physicalType,
-          inputSchema.physicalFieldTypeInfo,
-          schema.physicalType,
-          physicalGrouping,
+          namedAggregates,
+          inputSchema.relDataType,
+          inputSchema.fieldTypeInfos,
+          schema.relDataType,
+          grouping,
           needMerge)
 
       windowedStream
@@ -205,20 +211,20 @@ class DataStreamGroupWindowAggregate(
     else {
       val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
         window,
-        schema.physicalArity,
-        physicalNamedProperties)
+        schema.arity,
+        namedProperties)
 
       val windowedStream =
-        createNonKeyedWindowedStream(queryConfig, window, inputDS)
+        createNonKeyedWindowedStream(queryConfig, window, timestampedInput)
           .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
-          physicalNamedAggregates,
-          inputSchema.physicalType,
-          inputSchema.physicalFieldTypeInfo,
-          schema.physicalType,
+          namedAggregates,
+          inputSchema.relDataType,
+          inputSchema.fieldTypeInfos,
+          schema.relDataType,
           Array[Int](),
           needMerge)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 34a7fd8..7bf342a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -50,7 +50,7 @@ class DataStreamOverAggregate(
   with DataStreamRel {
   private val LOG = LoggerFactory.getLogger(this.getClass)
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def needsUpdatesAsRetraction = true
 
@@ -78,15 +78,15 @@ class DataStreamOverAggregate(
 
     super.explainTerms(pw)
       .itemIf("partitionBy",
-        partitionToString(schema.logicalType, partitionKeys), 
partitionKeys.nonEmpty)
+        partitionToString(schema.relDataType, partitionKeys), 
partitionKeys.nonEmpty)
       .item("orderBy",
-        orderingToString(schema.logicalType, 
overWindow.orderKeys.getFieldCollations))
+        orderingToString(schema.relDataType, 
overWindow.orderKeys.getFieldCollations))
       .itemIf("rows", windowRange(logicWindow, overWindow, inputNode), 
overWindow.isRows)
       .itemIf("range", windowRange(logicWindow, overWindow, inputNode), 
!overWindow.isRows)
       .item(
         "select", aggregationToString(
-          inputSchema.logicalType,
-          schema.logicalType,
+          inputSchema.relDataType,
+          schema.relDataType,
           namedAggregates))
   }
 
@@ -134,67 +134,44 @@ class DataStreamOverAggregate(
     val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
-      inputSchema.physicalTypeInfo)
+      inputSchema.typeInfo)
 
-    val timeType = schema.logicalType
+    val timeType = schema.relDataType
       .getFieldList
       .get(orderKey.getFieldIndex)
       .getType
 
-    timeType match {
-      case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
-        // proc-time OVER window
-        if (overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
-          // unbounded OVER window
-          createUnboundedAndCurrentRowOverWindow(
-            queryConfig,
-            generator,
-            inputDS,
-            isRowTimeType = false,
-            isRowsClause = overWindow.isRows)
-        } else if (
-          overWindow.lowerBound.isPreceding && 
!overWindow.lowerBound.isUnbounded &&
-            overWindow.upperBound.isCurrentRow) {
-
-          // bounded OVER window
-          createBoundedAndCurrentRowOverWindow(
-            queryConfig,
-            generator,
-            inputDS,
-            isRowTimeType = false,
-            isRowsClause = overWindow.isRows)
-        } else {
-          throw new TableException(
-            "OVER RANGE FOLLOWING windows are not supported yet.")
-        }
-
-      case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
-        // row-time OVER window
-        if (overWindow.lowerBound.isPreceding &&
-          overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
-          // unbounded OVER window
-          createUnboundedAndCurrentRowOverWindow(
-            queryConfig,
-            generator,
-            inputDS,
-            isRowTimeType = true,
-            isRowsClause = overWindow.isRows)
-        } else if (overWindow.lowerBound.isPreceding && 
overWindow.upperBound.isCurrentRow) {
-          // bounded OVER window
-          createBoundedAndCurrentRowOverWindow(
-            queryConfig,
-            generator,
-            inputDS,
-            isRowTimeType = true,
-            isRowsClause = overWindow.isRows)
-        } else {
-          throw new TableException(
-            "OVER RANGE FOLLOWING windows are not supported yet.")
-        }
-
-      case _ =>
-        throw new TableException(
-          s"OVER windows can only be applied on time attributes.")
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(s"OVER windows can only be applied on time 
attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding && overWindow.lowerBound.isUnbounded 
&&
+        overWindow.upperBound.isCurrentRow) {
+      // unbounded OVER window
+      createUnboundedAndCurrentRowOverWindow(
+        queryConfig,
+        generator,
+        inputDS,
+        rowTimeIdx,
+        isRowsClause = overWindow.isRows)
+    } else if (
+      overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded 
&&
+        overWindow.upperBound.isCurrentRow) {
+
+      // bounded OVER window
+      createBoundedAndCurrentRowOverWindow(
+        queryConfig,
+        generator,
+        inputDS,
+        rowTimeIdx,
+        isRowsClause = overWindow.isRows)
+    } else {
+      throw new TableException("OVER RANGE FOLLOWING windows are not supported 
yet.")
     }
   }
 
@@ -202,31 +179,26 @@ class DataStreamOverAggregate(
     queryConfig: StreamQueryConfig,
     generator: AggregationCodeGenerator,
     inputDS: DataStream[CRow],
-    isRowTimeType: Boolean,
+    rowTimeIdx: Option[Int],
     isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
 
-    val partitionKeys: Array[Int] = 
overWindow.keys.toArray.map(schema.mapIndex)
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
 
-    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates.map {
-      namedAggregate =>
-        new CalcitePair[AggregateCall, String](
-          schema.mapAggregateCall(namedAggregate.left),
-          namedAggregate.right)
-    }
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
 
     // get the output types
-    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+    val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
 
     val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
       generator,
       namedAggregates,
-      inputSchema.physicalType,
-      inputSchema.physicalTypeInfo,
-      inputSchema.physicalFieldTypeInfo,
+      inputSchema.relDataType,
+      inputSchema.typeInfo,
+      inputSchema.fieldTypeInfos,
       queryConfig,
-      isRowTimeType,
+      rowTimeIdx,
       partitionKeys.nonEmpty,
       isRowsClause)
 
@@ -254,34 +226,29 @@ class DataStreamOverAggregate(
     queryConfig: StreamQueryConfig,
     generator: AggregationCodeGenerator,
     inputDS: DataStream[CRow],
-    isRowTimeType: Boolean,
+    rowTimeIdx: Option[Int],
     isRowsClause: Boolean): DataStream[CRow] = {
 
     val overWindow: Group = logicWindow.groups.get(0)
-    val partitionKeys: Array[Int] = 
overWindow.keys.toArray.map(schema.mapIndex)
-    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates.map {
-      namedAggregate =>
-        new CalcitePair[AggregateCall, String](
-          schema.mapAggregateCall(namedAggregate.left),
-          namedAggregate.right)
-    }
+    val partitionKeys: Array[Int] = overWindow.keys.toArray
+    val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
 
     val precedingOffset =
       getLowerBoundary(logicWindow, overWindow, getInput()) + (if 
(isRowsClause) 1 else 0)
 
     // get the output types
-    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+    val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
 
     val processFunction = AggregateUtil.createBoundedOverProcessFunction(
       generator,
       namedAggregates,
-      inputSchema.physicalType,
-      inputSchema.physicalTypeInfo,
-      inputSchema.physicalFieldTypeInfo,
+      inputSchema.relDataType,
+      inputSchema.typeInfo,
+      inputSchema.fieldTypeInfos,
       precedingOffset,
       queryConfig,
       isRowsClause,
-      isRowTimeType
+      rowTimeIdx
     )
     val result: DataStream[CRow] =
     // partitioned aggregation
@@ -318,18 +285,18 @@ class DataStreamOverAggregate(
 
     s"over: (${
       if (!partitionKeys.isEmpty) {
-        s"PARTITION BY: ${partitionToString(inputSchema.logicalType, 
partitionKeys)}, "
+        s"PARTITION BY: ${partitionToString(inputSchema.relDataType, 
partitionKeys)}, "
       } else {
         ""
       }
-    }ORDER BY: ${orderingToString(inputSchema.logicalType,
+    }ORDER BY: ${orderingToString(inputSchema.relDataType,
         overWindow.orderKeys.getFieldCollations)}, " +
       s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
       s"${windowRange(logicWindow, overWindow, inputNode)}, " +
       s"select: (${
         aggregationToString(
-          inputSchema.logicalType,
-          schema.logicalType,
+          inputSchema.relDataType,
+          schema.relDataType,
           namedAggregates)
       }))"
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 424c6a2..9352efb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -43,7 +43,7 @@ class DataStreamScan(
 
   val dataStreamTable: DataStreamTable[Any] = 
getTable.unwrap(classOf[DataStreamTable[Any]])
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamScan(

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
index a11e6c1..8f9942f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
@@ -53,7 +53,7 @@ class DataStreamSort(
   with CommonSort
   with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def copy(
     traitSet: RelTraitSet,
@@ -75,13 +75,13 @@ class DataStreamSort(
   }
 
   override def toString: String = {
-    sortToString(schema.logicalType, sortCollation, sortOffset, sortFetch)
+    sortToString(schema.relDataType, sortCollation, sortOffset, sortFetch)
   }
   
   override def explainTerms(pw: RelWriter) : RelWriter = {
     sortExplainTerms(
       pw.input("input", getInput()),
-      schema.logicalType,
+      schema.relDataType,
       sortCollation, 
       sortOffset, 
       sortFetch)
@@ -94,7 +94,7 @@ class DataStreamSort(
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
     
     // need to identify time between others order fields. Time needs to be 
first sort element
-    val timeType = SortUtil.getFirstSortField(sortCollation, 
schema.logicalType).getType
+    val timeType = SortUtil.getFirstSortField(sortCollation, 
schema.relDataType).getType
     
     // time ordering needs to be ascending
     if (SortUtil.getFirstSortDirection(sortCollation) != Direction.ASCENDING) {
@@ -141,15 +141,15 @@ class DataStreamSort(
     inputDS: DataStream[CRow],
     execCfg: ExecutionConfig): DataStream[CRow] = {
 
-   val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+   val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
     
     // if the order has secondary sorting fields in addition to the proctime
     if (sortCollation.getFieldCollations.size() > 1) {
     
       val processFunction = SortUtil.createProcTimeSortFunction(
         sortCollation,
-        inputSchema.logicalType, 
-        inputSchema.physicalTypeInfo, 
+        inputSchema.relDataType,
+        inputSchema.typeInfo,
         execCfg)
       
       inputDS.keyBy(new NullByteKeySelector[CRow])
@@ -173,12 +173,12 @@ class DataStreamSort(
     inputDS: DataStream[CRow],
     execCfg: ExecutionConfig): DataStream[CRow] = {
 
-    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+    val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
        
     val processFunction = SortUtil.createRowTimeSortFunction(
       sortCollation,
-      inputSchema.logicalType, 
-      inputSchema.physicalTypeInfo, 
+      inputSchema.relDataType,
+      inputSchema.typeInfo,
       execCfg)
       
     inputDS.keyBy(new NullByteKeySelector[CRow])

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 6f4980a..7258ec8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -38,7 +38,7 @@ class DataStreamUnion(
   extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataStreamRel {
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType() = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamUnion(
@@ -55,7 +55,7 @@ class DataStreamUnion(
   }
 
   override def toString = {
-    s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
+    s"Union All(union: (${schema.fieldNames.mkString(", ")}))"
   }
 
   override def translateToPlan(
@@ -68,6 +68,6 @@ class DataStreamUnion(
   }
 
   private def unionSelectionToString: String = {
-    schema.logicalFieldNames.mkString(", ")
+    schema.fieldNames.mkString(", ")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 1476681..1ef9107 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -41,10 +41,10 @@ class DataStreamValues(
     schema: RowSchema,
     tuples: ImmutableList[ImmutableList[RexLiteral]],
     ruleDescription: String)
-  extends Values(cluster, schema.logicalType, tuples, traitSet)
+  extends Values(cluster, schema.relDataType, tuples, traitSet)
   with DataStreamRel {
 
-  override def deriveRowType() = schema.logicalType
+  override def deriveRowType() = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamValues(
@@ -62,14 +62,14 @@ class DataStreamValues(
 
     val config = tableEnv.getConfig
 
-    val returnType = CRowTypeInfo(schema.physicalTypeInfo)
+    val returnType = CRowTypeInfo(schema.typeInfo)
     val generator = new InputFormatCodeGenerator(config)
 
     // generate code for every record
     val generatedRecords = getTuples.asScala.map { r =>
       generator.generateResultExpression(
-        schema.physicalTypeInfo,
-        schema.physicalFieldNames,
+        schema.typeInfo,
+        schema.fieldNames,
         r.asScala)
     }
 
@@ -77,7 +77,7 @@ class DataStreamValues(
     val generatedFunction = generator.generateValuesInputFormat(
       ruleDescription,
       generatedRecords.map(_.code),
-      schema.physicalTypeInfo)
+      schema.typeInfo)
 
     val inputFormat = new CRowValuesInputFormat(
       generatedFunction.name,

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 987947c..f8015b3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -54,7 +54,7 @@ class DataStreamWindowJoin(
     with CommonJoin
     with DataStreamRel {
 
-  override def deriveRowType(): RelDataType = schema.logicalType
+  override def deriveRowType(): RelDataType = schema.relDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamWindowJoin(
@@ -76,7 +76,7 @@ class DataStreamWindowJoin(
 
   override def toString: String = {
     joinToString(
-      schema.logicalType,
+      schema.relDataType,
       joinCondition,
       joinType,
       getExpressionString)
@@ -85,7 +85,7 @@ class DataStreamWindowJoin(
   override def explainTerms(pw: RelWriter): RelWriter = {
     joinExplainTerms(
       super.explainTerms(pw),
-      schema.logicalType,
+      schema.relDataType,
       joinCondition,
       joinType,
       getExpressionString)
@@ -117,8 +117,8 @@ class DataStreamWindowJoin(
     WindowJoinUtil.generateJoinFunction(
       config,
       joinType,
-      leftSchema.physicalTypeInfo,
-      rightSchema.physicalTypeInfo,
+      leftSchema.typeInfo,
+      rightSchema.typeInfo,
       schema,
       remainCondition,
       ruleDescription)
@@ -160,13 +160,13 @@ class DataStreamWindowJoin(
       leftKeys: Array[Int],
       rightKeys: Array[Int]): DataStream[CRow] = {
 
-    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+    val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
 
     val procInnerJoinFunc = new ProcTimeWindowInnerJoin(
       leftLowerBound,
       leftUpperBound,
-      leftSchema.physicalTypeInfo,
-      rightSchema.physicalTypeInfo,
+      leftSchema.typeInfo,
+      rightSchema.typeInfo,
       joinFunctionName,
       joinFunctionCode)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 25e72fa..4aca856 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,14 +18,15 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.runtime.CRowOutputMapRunner
+import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 import scala.collection.JavaConverters._
@@ -40,29 +41,42 @@ trait StreamScan extends CommonScan[CRow] with 
DataStreamRel {
     : DataStream[CRow] = {
 
     val inputType = input.getType
-    val internalType = CRowTypeInfo(schema.physicalTypeInfo)
+    val internalType = CRowTypeInfo(schema.typeInfo)
 
     // conversion
     if (needsConversion(input.getType, internalType)) {
 
-      val function = generatedConversionFunction(
+      val generator = new FunctionCodeGenerator(
         config,
-        classOf[MapFunction[Any, Row]],
+        false,
         inputType,
-        schema.physicalTypeInfo,
-        "DataStreamSourceConversion",
-        schema.physicalFieldNames,
+        None,
         Some(flinkTable.fieldIndexes))
 
-      val mapFunc = new CRowOutputMapRunner(
+      val conversion = generator.generateConverterResultExpression(
+        schema.typeInfo,
+        schema.fieldNames)
+
+      val body =
+        s"""
+           |${conversion.code}
+           |${generator.collectorTerm}.collect(${conversion.resultTerm});
+           |""".stripMargin
+
+      val function = generator.generateFunction(
+        "DataStreamSourceConversion",
+        classOf[ProcessFunction[Any, Row]],
+        body,
+        schema.typeInfo)
+
+      val processFunc = new CRowOutputProcessRunner(
         function.name,
         function.code,
         internalType)
 
       val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
-      // TODO we need a ProcessFunction here
-      input.map(mapFunc).name(opName).returns(internalType)
+      input.process(processFunc).name(opName).returns(internalType)
     }
     // no conversion necessary, forward
     else {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 72ecac5..663b276 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.sources._
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
 class StreamTableSourceScan(
@@ -46,29 +47,29 @@ class StreamTableSourceScan(
     val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
     val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-    val fieldCnt = fieldNames.length
+    val fields = fieldNames.zip(fieldTypes)
 
-    val rowtime = tableSource match {
+    val withRowtime = tableSource match {
       case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
-        Some((fieldCnt, rowtimeAttribute))
+        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
       case _ =>
-        None
+        fields
     }
 
-    val proctime = tableSource match {
+    val withProctime = tableSource match {
       case timeSource: DefinedProctimeAttribute if 
timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
-        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+        withRowtime :+ (proctimeAttribute, 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
       case _ =>
-        None
+        withRowtime
     }
 
+    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = 
withProctime.unzip
+
     flinkTypeFactory.buildLogicalRowType(
-      fieldNames,
-      fieldTypes,
-      rowtime,
-      proctime)
+      fieldNamesWithIndicators,
+      fieldTypesWithIndicators)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {

Reply via email to