http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 3ae949e..470d006 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, 
DefinedRowtimeAttribute, TableSource}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 import scala.collection.JavaConverters._
 
@@ -51,29 +52,29 @@ class FlinkLogicalTableSourceScan(
     val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
     val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-    val fieldCnt = fieldNames.length
+    val fields = fieldNames.zip(fieldTypes)
 
-    val rowtime = tableSource match {
+    val withRowtime = tableSource match {
       case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
-        Some((fieldCnt, rowtimeAttribute))
+        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
       case _ =>
-        None
+        fields
     }
 
-    val proctime = tableSource match {
+    val withProctime = tableSource match {
       case timeSource: DefinedProctimeAttribute if 
timeSource.getProctimeAttribute != null =>
         val proctimeAttribute = timeSource.getProctimeAttribute
-        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+        withRowtime :+ (proctimeAttribute, 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
       case _ =>
-        None
+        withRowtime
     }
 
+    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = 
withProctime.unzip
+
     flinkTypeFactory.buildLogicalRowType(
-      fieldNames,
-      fieldTypes,
-      rowtime,
-      proctime)
+      fieldNamesWithIndicators,
+      fieldTypesWithIndicators)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index 2075689..7dfcbc5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -87,7 +87,7 @@ class DataStreamWindowJoinRule
     val (windowBounds, remainCondition) =
       WindowJoinUtil.extractWindowBoundsFromPredicate(
         joinInfo.getRemaining(join.getCluster.getRexBuilder),
-        leftRowSchema.logicalArity,
+        leftRowSchema.arity,
         join.getRowType,
         join.getCluster.getRexBuilder,
         TableConfig.DEFAULT)

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index 70054b4..b7021e2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -18,27 +18,14 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 
 class DataStreamTable[T](
     val dataStream: DataStream[T],
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String],
-    val rowtime: Option[(Int, String)],
-    val proctime: Option[(Int, String)],
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, 
statistic) {
 
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-
-    flinkTypeFactory.buildLogicalRowType(
-      fieldNames,
-      fieldTypes,
-      rowtime,
-      proctime)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index 752b00e..df56ae6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 abstract class FlinkTable[T](
     val typeInfo: TypeInformation[T],
@@ -49,12 +50,15 @@ abstract class FlinkTable[T](
     typeInfo match {
       case cType: CompositeType[_] =>
         // it is ok to leave out fields
-        if (fieldNames.length > cType.getArity) {
+        if (fieldIndexes.count(_ >= 0) > cType.getArity) {
           throw new TableException(
           s"Arity of type (" + cType.getFieldNames.deep + ") " +
             "must not be greater than number of field names " + 
fieldNames.deep + ".")
         }
-        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+        fieldIndexes.map {
+          case TimeIndicatorTypeInfo.ROWTIME_MARKER => 
TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+          case TimeIndicatorTypeInfo.PROCTIME_MARKER => 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+          case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
       case aType: AtomicType[_] =>
         if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
           throw new TableException(
@@ -65,7 +69,7 @@ abstract class FlinkTable[T](
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
     val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes, None, None)
+    flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
index ccbe44d..ad0f552 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -18,14 +18,10 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, 
RelRecordType}
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.functions.TimeMaterializationSqlFunction
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
@@ -35,127 +31,35 @@ import scala.collection.JavaConversions._
   */
 class RowSchema(private val logicalRowType: RelDataType) {
 
-  private lazy val physicalRowFields: Seq[RelDataTypeField] = 
logicalRowType.getFieldList filter {
-    field => !FlinkTypeFactory.isTimeIndicatorType(field.getType)
-  }
-
-  private lazy val physicalRowType: RelDataType = new 
RelRecordType(physicalRowFields)
-
-  private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = 
physicalRowFields map { f =>
-    FlinkTypeFactory.toTypeInfo(f.getType)
-  }
-
-  private lazy val physicalRowFieldNames: Seq[String] = 
physicalRowFields.map(_.getName)
+  private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] =
+    logicalRowType.getFieldList map { f => 
FlinkTypeFactory.toTypeInfo(f.getType) }
 
   private lazy val physicalRowTypeInfo: TypeInformation[Row] = new RowTypeInfo(
-    physicalRowFieldTypes.toArray, physicalRowFieldNames.toArray)
-
-  private lazy val indexMapping: Array[Int] = generateIndexMapping
-
-  private lazy val inputRefUpdater = new RexInputRefUpdater()
-
-  private def generateIndexMapping: Array[Int] = {
-    val mapping = new Array[Int](logicalRowType.getFieldCount)
-    var countTimeIndicators = 0
-    var i = 0
-    while (i < logicalRowType.getFieldCount) {
-      val t = logicalRowType.getFieldList.get(i).getType
-      if (FlinkTypeFactory.isTimeIndicatorType(t)) {
-        countTimeIndicators += 1
-        // no mapping
-        mapping(i) = -1
-      } else {
-        mapping(i) = i - countTimeIndicators
-      }
-      i += 1
-    }
-    mapping
-  }
-
-  private class RexInputRefUpdater extends RexShuttle {
-
-    override def visitInputRef(inputRef: RexInputRef): RexNode = {
-      new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType)
-    }
-
-    override def visitCall(call: RexCall): RexNode = call.getOperator match {
-      // we leave time indicators unchanged yet
-      // the index becomes invalid but right now we are only
-      // interested in the type of the input reference
-      case TimeMaterializationSqlFunction => call
-      case _ => super.visitCall(call)
-    }
-  }
-
-  /**
-    * Returns the arity of the logical record.
-    */
-  def logicalArity: Int = logicalRowType.getFieldCount
-
-  /**
-    * Returns the arity of the physical record.
-    */
-  def physicalArity: Int = physicalTypeInfo.getArity
-
-  /**
-    * Returns a logical [[RelDataType]] including logical fields (i.e. time 
indicators).
-    */
-  def logicalType: RelDataType = logicalRowType
-
-  /**
-    * Returns a physical [[RelDataType]] with no logical fields (i.e. time 
indicators).
-    */
-  def physicalType: RelDataType = physicalRowType
-
-  /**
-    * Returns a physical [[TypeInformation]] of row with no logical fields 
(i.e. time indicators).
-    */
-  def physicalTypeInfo: TypeInformation[Row] = physicalRowTypeInfo
-
-  /**
-    * Returns [[TypeInformation]] of the row's fields with no logical fields 
(i.e. time indicators).
-    */
-  def physicalFieldTypeInfo: Seq[TypeInformation[_]] = physicalRowFieldTypes
+    physicalRowFieldTypes.toArray, fieldNames.toArray)
 
   /**
-    * Returns the logical fields names including logical fields (i.e. time 
indicators).
+    * Returns the arity of the schema.
     */
-  def logicalFieldNames: Seq[String] = logicalRowType.getFieldNames
+  def arity: Int = logicalRowType.getFieldCount
 
   /**
-    * Returns the physical fields names with no logical fields (i.e. time 
indicators).
+    * Returns the [[RelDataType]] of the schema
     */
-  def physicalFieldNames: Seq[String] = physicalRowFieldNames
+  def relDataType: RelDataType = logicalRowType
 
   /**
-    * Converts logical indices to physical indices based on this schema.
+    * Returns the [[TypeInformation]] of of the schema
     */
-  def mapIndex(logicalIndex: Int): Int = {
-    val mappedIndex = indexMapping(logicalIndex)
-    if (mappedIndex < 0) {
-      throw new TableException("Invalid access to a logical field.")
-    } else {
-      mappedIndex
-    }
-  }
+  def typeInfo: TypeInformation[Row] = physicalRowTypeInfo
 
   /**
-    * Converts logical indices of a aggregate call to physical ones.
+    * Returns the [[TypeInformation]] of fields of the schema
     */
-  def mapAggregateCall(logicalAggCall: AggregateCall): AggregateCall = {
-    logicalAggCall.copy(
-      logicalAggCall.getArgList.map(mapIndex(_).asInstanceOf[Integer]),
-      if (logicalAggCall.filterArg < 0) {
-        logicalAggCall.filterArg
-      } else {
-        mapIndex(logicalAggCall.filterArg)
-      }
-    )
-  }
+  def fieldTypeInfos: Seq[TypeInformation[_]] = physicalRowFieldTypes
 
   /**
-    * Converts logical field references of a [[RexNode]] to physical ones.
+    * Returns the fields names
     */
-  def mapRexNode(logicalRexNode: RexNode): RexNode = 
logicalRexNode.accept(inputRefUpdater)
+  def fieldNames: Seq[String] = logicalRowType.getFieldNames
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index 408381d..dc1f31a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.{TableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.{DefinedProctimeAttribute, 
DefinedRowtimeAttribute, TableSource}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 class StreamTableSourceTable[T](
     override val tableSource: TableSource[T],
@@ -36,41 +37,38 @@ class StreamTableSourceTable[T](
     val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
     val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-    val fieldCnt = fieldNames.length
+    val fields = fieldNames.zip(fieldTypes)
 
-    val rowtime = tableSource match {
-      case nullTimeSource : DefinedRowtimeAttribute
-        if nullTimeSource.getRowtimeAttribute == null =>
-          None
-      case emptyStringTimeSource: DefinedRowtimeAttribute
-        if emptyStringTimeSource.getRowtimeAttribute.trim.equals("")  =>
-          throw TableException("The name of the rowtime attribute must not be 
empty.")
-      case timeSource: DefinedRowtimeAttribute  =>
+    val withRowtime = tableSource match {
+      case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute == null =>
+        fields
+      case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute.trim.equals("") =>
+        throw TableException("The name of the rowtime attribute must not be 
empty.")
+      case timeSource: DefinedRowtimeAttribute =>
         val rowtimeAttribute = timeSource.getRowtimeAttribute
-        Some((fieldCnt, rowtimeAttribute))
+        fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
       case _ =>
-        None
+        fields
     }
 
-    val proctime = tableSource match {
-      case nullTimeSource : DefinedProctimeAttribute
-        if nullTimeSource.getProctimeAttribute == null =>
-          None
-      case emptyStringTimeSource: DefinedProctimeAttribute
-        if emptyStringTimeSource.getProctimeAttribute.trim.equals("")  =>
-          throw TableException("The name of the proctime attribute must not be 
empty.")
-      case timeSource: DefinedProctimeAttribute  =>
+    val withProctime = tableSource match {
+      case timeSource : DefinedProctimeAttribute if 
timeSource.getProctimeAttribute == null =>
+        withRowtime
+      case timeSource: DefinedProctimeAttribute
+        if timeSource.getProctimeAttribute.trim.equals("") =>
+        throw TableException("The name of the rowtime attribute must not be 
empty.")
+      case timeSource: DefinedProctimeAttribute =>
         val proctimeAttribute = timeSource.getProctimeAttribute
-        Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+        withRowtime :+ (proctimeAttribute, 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
       case _ =>
-        None
+        withRowtime
     }
 
+    val (fieldNamesWithIndicators, fieldTypesWithIndicators) = 
withProctime.unzip
+
     flinkTypeFactory.buildLogicalRowType(
-      fieldNames,
-      fieldTypes,
-      rowtime,
-      proctime)
+      fieldNamesWithIndicators,
+      fieldTypesWithIndicators)
 
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
index 7c96437..6b3aa44 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -19,7 +19,9 @@
 package org.apache.flink.table.runtime
 
 import java.lang.{Boolean => JBool}
+import java.sql.Timestamp
 
+import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
deleted file mode 100644
index cb8f695..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
-  * MapRunner with [[CRow]] output.
-  */
-class CRowOutputMapRunner(
-    name: String,
-    code: String,
-    @transient var returnType: TypeInformation[CRow])
-  extends RichMapFunction[Any, CRow]
-  with ResultTypeQueryable[CRow]
-  with Compiler[MapFunction[Any, Row]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: MapFunction[Any, Row] = _
-  private var outCRow: CRow = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating MapFunction.")
-    function = clazz.newInstance()
-    outCRow = new CRow(null, true)
-  }
-
-  override def map(in: Any): CRow = {
-    outCRow.row = function.map(in)
-    outCRow
-  }
-
-  override def getProducedType: TypeInformation[CRow] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
new file mode 100644
index 0000000..c80f291
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class CRowOutputProcessRunner(
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[Any, CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[ProcessFunction[Any, Row]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: ProcessFunction[Any, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating ProcessFunction.")
+    function = clazz.newInstance()
+
+    this.cRowWrapper = new CRowWrappingCollector()
+    this.cRowWrapper.setChange(true)
+  }
+
+  override def processElement(
+      in: Any,
+      ctx: ProcessFunction[Any, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    // remove timestamp from stream record
+    val tc = out.asInstanceOf[TimestampedCollector[_]]
+    tc.eraseTimestamp()
+
+    cRowWrapper.out = out
+    function.processElement(in, ctx.asInstanceOf[ProcessFunction[Any, 
Row]#Context], cRowWrapper)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
new file mode 100644
index 0000000..00961f0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] 
field into the
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]].
+  */
+class TimestampSetterProcessFunction(
+    val rowtimeIdx: Int,
+    @transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow] {
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    val timestamp = 
SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp])
+    
out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp)
+    out.collect(in)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
new file mode 100644
index 0000000..8f12c30
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
+  */
+class WrappingTimestampSetterProcessFunction[OUT](
+    function: MapFunction[CRow, OUT],
+    rowtimeIdx: Int)
+  extends ProcessFunction[CRow, OUT] {
+
+  override def open(parameters: Configuration): Unit = {
+    super.open(parameters)
+    function match {
+      case f: RichMapFunction[_, _] =>
+        f.setRuntimeContext(getRuntimeContext)
+        f.open(parameters)
+      case _ =>
+    }
+  }
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, OUT]#Context,
+      out: Collector[OUT]): Unit = {
+
+    val timestamp = 
SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp])
+    out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp)
+
+    out.collect(function.map(in))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index c9f98e3..52105e3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -35,7 +35,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin
 import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{AggregationCodeGenerator, CodeGenerator}
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions._
@@ -66,7 +66,7 @@ object AggregateUtil {
     * @param inputType Physical type of the row.
     * @param inputTypeInfo Physical type information of the row.
     * @param inputFieldTypeInfo Physical type information of the row's fields.
-    * @param isRowTimeType It is a tag that indicates whether the time type is 
rowTimeType
+    * @param rowTimeIdx The index of the rowtime field or None in case of 
processing time.
     * @param isPartitioned It is a tag that indicate whether the input is 
partitioned
     * @param isRowsClause It is a tag that indicates whether the OVER clause 
is ROWS clause
     */
@@ -77,7 +77,7 @@ object AggregateUtil {
       inputTypeInfo: TypeInformation[Row],
       inputFieldTypeInfo: Seq[TypeInformation[_]],
       queryConfig: StreamQueryConfig,
-      isRowTimeType: Boolean,
+      rowTimeIdx: Option[Int],
       isPartitioned: Boolean,
       isRowsClause: Boolean)
     : ProcessFunction[CRow, CRow] = {
@@ -111,13 +111,14 @@ object AggregateUtil {
       needReset = false
     )
 
-    if (isRowTimeType) {
+    if (rowTimeIdx.isDefined) {
       if (isRowsClause) {
         // ROWS unbounded over process function
         new RowTimeUnboundedRowsOver(
           genFunction,
           aggregationStateType,
           CRowTypeInfo(inputTypeInfo),
+          rowTimeIdx.get,
           queryConfig)
       } else {
         // RANGE unbounded over process function
@@ -125,6 +126,7 @@ object AggregateUtil {
           genFunction,
           aggregationStateType,
           CRowTypeInfo(inputTypeInfo),
+          rowTimeIdx.get,
           queryConfig)
       }
     } else {
@@ -207,7 +209,7 @@ object AggregateUtil {
     * @param inputFieldTypeInfo Physical type information of the row's fields.
     * @param precedingOffset the preceding offset
     * @param isRowsClause    It is a tag that indicates whether the OVER 
clause is ROWS clause
-    * @param isRowTimeType   It is a tag that indicates whether the time type 
is rowTimeType
+    * @param rowTimeIdx      The index of the rowtime field or None in case of 
processing time.
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
   private[flink] def createBoundedOverProcessFunction(
@@ -219,7 +221,7 @@ object AggregateUtil {
       precedingOffset: Long,
       queryConfig: StreamQueryConfig,
       isRowsClause: Boolean,
-      isRowTimeType: Boolean)
+      rowTimeIdx: Option[Int])
     : ProcessFunction[CRow, CRow] = {
 
     val needRetract = true
@@ -253,13 +255,14 @@ object AggregateUtil {
       needReset = true
     )
 
-    if (isRowTimeType) {
+    if (rowTimeIdx.isDefined) {
       if (isRowsClause) {
         new RowTimeBoundedRowsOver(
           genFunction,
           aggregationStateType,
           inputRowType,
           precedingOffset,
+          rowTimeIdx.get,
           queryConfig)
       } else {
         new RowTimeBoundedRangeOver(
@@ -267,6 +270,7 @@ object AggregateUtil {
           aggregationStateType,
           inputRowType,
           precedingOffset,
+          rowTimeIdx.get,
           queryConfig)
       }
     } else {
@@ -588,7 +592,7 @@ object AggregateUtil {
     window match {
       case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) 
=>
         // tumbling time window
-        val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
           // for incremental aggregations
           new DataSetTumbleTimeWindowAggReduceCombineFunction(
@@ -615,7 +619,7 @@ object AggregateUtil {
           asLong(size))
 
       case SessionGroupWindow(_, _, gap) =>
-        val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
         new DataSetSessionWindowAggReduceGroupFunction(
           genFinalAggFunction,
           keysAndAggregatesArity,
@@ -625,7 +629,7 @@ object AggregateUtil {
           isInputCombined)
 
       case SlidingGroupWindow(_, _, size, _) if 
isTimeInterval(size.resultType) =>
-        val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+        val (startPos, endPos, _) = computeWindowPropertyPos(properties)
         if (doAllSupportPartialMerge(aggregates)) {
           // for partial aggregations
           new DataSetSlideWindowAggReduceCombineFunction(
@@ -951,10 +955,11 @@ object AggregateUtil {
     : AllWindowFunction[Row, CRow, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
       new IncrementalAggregateAllTimeWindowFunction(
         startPos,
         endPos,
+        timePos,
         finalRowArity)
         .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]]
     } else {
@@ -975,12 +980,13 @@ object AggregateUtil {
     WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
       new IncrementalAggregateTimeWindowFunction(
         numGroupingKeys,
         numAggregates,
         startPos,
         endPos,
+        timePos,
         finalRowArity)
         .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
     } else {
@@ -1136,25 +1142,31 @@ object AggregateUtil {
     }
   }
 
-  private[flink] def computeWindowStartEndPropertyPos(
-      properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
+  private[flink] def computeWindowPropertyPos(
+      properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], 
Option[Int]) = {
 
-    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 
0)) {
-      (p, x) => p match {
+    val propPos = properties.foldRight(
+      (None: Option[Int], None: Option[Int], None: Option[Int], 0)) {
+      case (p, (s, e, t, i)) => p match {
         case NamedWindowProperty(_, prop) =>
           prop match {
-            case WindowStart(_) if x._1.isDefined =>
+            case WindowStart(_) if s.isDefined =>
               throw new TableException("Duplicate WindowStart property 
encountered. This is a bug.")
             case WindowStart(_) =>
-              (Some(x._3), x._2, x._3 - 1)
-            case WindowEnd(_) if x._2.isDefined =>
+              (Some(i), e, t, i - 1)
+            case WindowEnd(_) if e.isDefined =>
               throw new TableException("Duplicate WindowEnd property 
encountered. This is a bug.")
             case WindowEnd(_) =>
-              (x._1, Some(x._3), x._3 - 1)
+              (s, Some(i), t, i - 1)
+            case RowtimeAttribute(_) if t.isDefined =>
+              throw new TableException(
+                "Duplicate Window rowtime property encountered. This is a 
bug.")
+            case RowtimeAttribute(_) =>
+              (s, e, Some(i), i - 1)
           }
       }
     }
-    (propPos._1, propPos._2)
+    (propPos._1, propPos._2, propPos._3)
   }
 
   private def transformToAggregateFunctions(

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index fabf200..2160ef5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -78,7 +78,10 @@ class DataSetSessionWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(
+      finalRowWindowStartPos,
+      finalRowWindowEndPos,
+      None)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index 56ed08a..e4b9458 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -68,7 +68,10 @@ class DataSetSlideWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+    collector = new RowTimeWindowPropertyCollector(
+      finalRowWindowStartPos,
+      finalRowWindowEndPos,
+      None)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 8af2c2e..b4f7585 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -67,7 +67,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
 
     output = function.createOutputRow()
     accumulators = function.createAccumulators()
-    collector = new RowTimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+    collector = new RowTimeWindowPropertyCollector(windowStartPos, 
windowEndPos, None)
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index 711cc05..3c2e858 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -29,13 +29,15 @@ import org.apache.flink.util.Collector
   *
   * Computes the final aggregate value from incrementally computed aggregates.
   *
-  * @param windowStartPos the start position of window
-  * @param windowEndPos   the end position of window
+  * @param windowStartOffset the offset of the window start property
+  * @param windowEndOffset   the offset of the window end property
+  * @param windowRowtimeOffset the offset of the window rowtime property
   * @param finalRowArity  The arity of the final output row.
   */
 class IncrementalAggregateAllTimeWindowFunction(
-    private val windowStartPos: Option[Int],
-    private val windowEndPos: Option[Int],
+    private val windowStartOffset: Option[Int],
+    private val windowEndOffset: Option[Int],
+    private val windowRowtimeOffset: Option[Int],
     private val finalRowArity: Int)
   extends IncrementalAggregateAllWindowFunction[TimeWindow](
     finalRowArity) {
@@ -43,7 +45,10 @@ class IncrementalAggregateAllTimeWindowFunction(
   private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new CRowTimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(
+      windowStartOffset,
+      windowEndOffset,
+      windowRowtimeOffset)
     super.open(parameters)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index 809bbfd..1950230 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -29,15 +29,19 @@ import org.apache.flink.util.Collector
 /**
   * Computes the final aggregate value from incrementally computed aggreagtes.
   *
-  * @param windowStartPos the start position of window
-  * @param windowEndPos   the end position of window
-  * @param finalRowArity  The arity of the final output row
+  * @param numGroupingKey the number of grouping keys
+  * @param numAggregates the number of aggregates
+  * @param windowStartOffset the offset of the window start property
+  * @param windowEndOffset   the offset of the window end property
+  * @param windowRowtimeOffset the offset of the window rowtime property
+  * @param finalRowArity  The arity of the final output row.
   */
 class IncrementalAggregateTimeWindowFunction(
     private val numGroupingKey: Int,
     private val numAggregates: Int,
-    private val windowStartPos: Option[Int],
-    private val windowEndPos: Option[Int],
+    private val windowStartOffset: Option[Int],
+    private val windowEndOffset: Option[Int],
+    private val windowRowtimeOffset: Option[Int],
     private val finalRowArity: Int)
   extends IncrementalAggregateWindowFunction[TimeWindow](
     numGroupingKey,
@@ -47,7 +51,10 @@ class IncrementalAggregateTimeWindowFunction(
   private var collector: CRowTimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    collector = new CRowTimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+    collector = new CRowTimeWindowPropertyCollector(
+      windowStartOffset,
+      windowEndOffset,
+      windowRowtimeOffset)
     super.open(parameters)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 8f2ec98..ab3dc1d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import java.util.{ArrayList, List => JList}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -123,6 +124,9 @@ class ProcTimeBoundedRangeOver(
       return
     }
 
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+
     // we consider the original timestamp of events
     // that have registered this time trigger 1 ms ago
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
index 2d0b14b..1e12060 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
@@ -23,10 +23,11 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.types.Row
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.util.{Collector, Preconditions}
-
 import java.util.ArrayList
 import java.util.Collections
 
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+
 
 /**
  * ProcessFunction to sort on processing time and additional attributes.
@@ -75,7 +76,10 @@ class ProcTimeSortProcessFunction(
     timestamp: Long,
     ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
-    
+
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+
     val iter =  bufferedEvents.get.iterator()
 
     // insert all rows into the sort buffer

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 1a207bb..ceb986d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -17,13 +17,16 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.sql.Timestamp
 import java.util.{ArrayList => JArrayList, List => JList}
 
+import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -44,6 +47,7 @@ class RowTimeBoundedRangeOver(
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
     precedingOffset: Long,
+    rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
@@ -114,7 +118,7 @@ class RowTimeBoundedRangeOver(
     registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime())
 
     // triggering timestamp for trigger calculation
-    val triggeringTs = ctx.timestamp
+    val triggeringTs = 
SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
 
     val lastTriggeringTs = lastTriggeringTsState.value
 
@@ -166,6 +170,9 @@ class RowTimeBoundedRangeOver(
       return
     }
 
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index a4b1076..678a3b7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -17,14 +17,17 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.sql.Timestamp
 import java.util
 import java.util.{List => JList}
 
+import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
@@ -45,6 +48,7 @@ class RowTimeBoundedRowsOver(
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
     precedingOffset: Long,
+    rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
@@ -123,7 +127,7 @@ class RowTimeBoundedRowsOver(
     registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime())
 
     // triggering timestamp for trigger calculation
-    val triggeringTs = ctx.timestamp
+    val triggeringTs = 
SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
 
     val lastTriggeringTs = lastTriggeringTsState.value
     // check if the data is expired, if not, save the data and register event 
time timer
@@ -175,6 +179,9 @@ class RowTimeBoundedRowsOver(
       return
     }
 
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+
     // gets all window data from state for the calculation
     val inputs: JList[Row] = dataState.get(timestamp)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
index 737f32c..fd58678 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.common.state.ValueState
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.common.state.MapState
@@ -28,18 +30,22 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
-
 import java.util.Collections
-import java.util.{List => JList, ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.streaming.api.operators.TimestampedCollector
 
 /**
  * ProcessFunction to sort on event-time and possibly addtional secondary sort 
attributes.
  *
   * @param inputRowType The data type of the input data.
+  * @param rowtimeIdx The index of the rowtime field.
   * @param rowComparator A comparator to sort rows.
  */
 class RowTimeSortProcessFunction(
     private val inputRowType: CRowTypeInfo,
+    private val rowtimeIdx: Int,
     private val rowComparator: Option[CollectionRowComparator])
   extends ProcessFunction[CRow, CRow] {
 
@@ -84,7 +90,7 @@ class RowTimeSortProcessFunction(
     val input = inputC.row
     
     // timestamp of the processed row
-    val rowtime = ctx.timestamp
+    val rowtime = 
SqlFunctions.toLong(input.getField(rowtimeIdx).asInstanceOf[Timestamp])
 
     val lastTriggeringTs = lastTriggeringTsState.value
 
@@ -105,13 +111,15 @@ class RowTimeSortProcessFunction(
       }
     }
   }
-  
-  
+
   override def onTimer(
     timestamp: Long,
     ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
-    
+
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+
     // gets all rows for the triggering timestamps
     val inputs: JList[Row] = dataState.get(timestamp)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index f38ba93..04b63a1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -17,14 +17,16 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.sql.Timestamp
 import java.util
 import java.util.{List => JList}
 
+import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.streaming.api.operators.TimestampedCollector
@@ -45,6 +47,7 @@ abstract class RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
+    rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
   extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations] {
@@ -108,7 +111,7 @@ abstract class RowTimeUnboundedOver(
     // register state-cleanup timer
     registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime())
 
-    val timestamp = ctx.timestamp()
+    val timestamp = 
SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
     val curWatermark = ctx.timerService().currentWatermark()
 
     // discard late record
@@ -158,8 +161,8 @@ abstract class RowTimeUnboundedOver(
       return
     }
 
-    Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]])
-    val collector = out.asInstanceOf[TimestampedCollector[CRow]]
+    // remove timestamp set outside of ProcessFunction.
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
 
     val keyIterator = rowMapState.keys.iterator
     if (keyIterator.hasNext) {
@@ -188,10 +191,9 @@ abstract class RowTimeUnboundedOver(
       while (!sortedTimestamps.isEmpty) {
         val curTimestamp = sortedTimestamps.removeFirst()
         val curRowList = rowMapState.get(curTimestamp)
-        collector.setAbsoluteTimestamp(curTimestamp)
 
         // process the same timestamp datas, the mechanism is different 
according ROWS or RANGE
-        processElementsWithSameTimestamp(curRowList, lastAccumulator, 
collector)
+        processElementsWithSameTimestamp(curRowList, lastAccumulator, out)
 
         rowMapState.remove(curTimestamp)
       }
@@ -250,11 +252,13 @@ class RowTimeUnboundedRowsOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
+    rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
     inputType,
+    rowTimeIdx,
     queryConfig) {
 
   override def processElementsWithSameTimestamp(
@@ -266,7 +270,6 @@ class RowTimeUnboundedRowsOver(
     while (i < curRowList.size) {
       val curRow = curRowList.get(i)
 
-      var j = 0
       // copy forwarded fields to output row
       function.setForwardedFields(curRow, output.row)
 
@@ -290,11 +293,13 @@ class RowTimeUnboundedRangeOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
+    rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
   extends RowTimeUnboundedOver(
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
     inputType,
+    rowTimeIdx,
     queryConfig) {
 
   override def processElementsWithSameTimestamp(

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index 5f83f1d..d62c7b9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -60,6 +60,8 @@ object SortUtil {
     inputTypeInfo: TypeInformation[Row],
     execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
 
+    val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex
+
     val collectionRowComparator = if (collationSort.getFieldCollations.size() 
> 1) {
 
       val rowComp = createRowComparator(
@@ -76,6 +78,7 @@ object SortUtil {
  
     new RowTimeSortProcessFunction(
       inputCRowType,
+      rowtimeIdx,
       collectionRowComparator)
 
   }
@@ -139,7 +142,7 @@ object SortUtil {
     }
 
     new RowComparator(
-      new RowSchema(inputType).physicalArity,
+      new RowSchema(inputType).arity,
       sortFields.toArray,
       fieldComps.toArray,
       new Array[TypeSerializer[AnyRef]](0), // not required because we only 
compare objects.

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
index 0c8ae00..4ec5239 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -29,7 +29,8 @@ import org.apache.flink.util.Collector
   */
 abstract class TimeWindowPropertyCollector[T](
     windowStartOffset: Option[Int],
-    windowEndOffset: Option[Int])
+    windowEndOffset: Option[Int],
+    windowRowtimeOffset: Option[Int])
   extends Collector[T] {
 
   var wrappedCollector: Collector[T] = _
@@ -55,20 +56,32 @@ abstract class TimeWindowPropertyCollector[T](
         SqlFunctions.internalToTimestamp(windowEnd))
     }
 
+    if (windowRowtimeOffset.isDefined) {
+      output.setField(
+        lastFieldPos + windowRowtimeOffset.get,
+        SqlFunctions.internalToTimestamp(windowEnd - 1))
+    }
+
     wrappedCollector.collect(record)
   }
 
   override def close(): Unit = wrappedCollector.close()
 }
 
-class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: 
Option[Int])
-  extends TimeWindowPropertyCollector[Row](startOffset, endOffset) {
+class RowTimeWindowPropertyCollector(
+    startOffset: Option[Int],
+    endOffset: Option[Int],
+    rowtimeOffset: Option[Int])
+  extends TimeWindowPropertyCollector[Row](startOffset, endOffset, 
rowtimeOffset) {
 
   override def getRow(record: Row): Row = record
 }
 
-class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: 
Option[Int])
-  extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) {
+class CRowTimeWindowPropertyCollector(
+    startOffset: Option[Int],
+    endOffset: Option[Int],
+    rowtimeOffset: Option[Int])
+  extends TimeWindowPropertyCollector[CRow](startOffset, endOffset, 
rowtimeOffset) {
 
   override def getRow(record: CRow): Row = record.row
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 379b8d2..b566113 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -418,8 +418,8 @@ object WindowJoinUtil {
       Some(rightType))
 
     val conversion = generator.generateConverterResultExpression(
-      returnType.physicalTypeInfo,
-      returnType.physicalType.getFieldNames.asScala)
+      returnType.typeInfo,
+      returnType.fieldNames)
 
     // if other condition is none, then output the result directly
     val body = otherCondition match {
@@ -429,9 +429,8 @@ object WindowJoinUtil {
            |${generator.collectorTerm}.collect(${conversion.resultTerm});
            |""".stripMargin
       case Some(remainCondition) =>
-        // map logical field accesses to physical accesses
-        val physicalCondition = returnType.mapRexNode(remainCondition)
-        val genCond = generator.generateExpression(physicalCondition)
+        // generate code for remaining condition
+        val genCond = generator.generateExpression(remainCondition)
         s"""
            |${genCond.code}
            |if (${genCond.resultTerm}) {
@@ -445,7 +444,7 @@ object WindowJoinUtil {
       ruleDescription,
       classOf[FlatJoinFunction[Row, Row, Row]],
       body,
-      returnType.physicalTypeInfo)
+      returnType.typeInfo)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
index 083f1eb..e0e054b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -40,6 +40,9 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean)
 
 object TimeIndicatorTypeInfo {
 
+  val ROWTIME_MARKER: Int = -1
+  val PROCTIME_MARKER: Int = -2
+
   val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
   val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
index a5a1319..d20002a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
@@ -41,7 +41,7 @@ class SortTest extends TableTestBase {
         unaryNode("DataStreamSort",
           streamTableNode(0),
           term("orderBy", "proctime ASC", "c ASC")),
-        term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c"))
+        term("select", "a", "PROCTIME(proctime) AS proctime", "c"))
 
     streamUtil.verifySql(sqlQuery, expected)
   }
@@ -57,7 +57,7 @@ class SortTest extends TableTestBase {
         unaryNode("DataStreamSort",
           streamTableNode(0),
           term("orderBy", "rowtime ASC, c ASC")),
-        term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c"))
+        term("select", "a", "rowtime", "c"))
        
     streamUtil.verifySql(sqlQuery, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index 5d4386c..6967061 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -43,7 +43,7 @@ class TableSourceTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, 
addTime])",
-        term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", 
"name", "val")
+        term("select", "addTime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }
@@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, 
pTime])",
-        term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", 
"val")
+        term("select", "PROCTIME(pTime) AS pTime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index b17debe..ab80c65 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -48,7 +48,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       streamTableNode(0),
-      term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS 
rowtime"),
+      term("select", "FLOOR(CAST(rowtime)", "FLAG(DAY)) AS rowtime"),
       term("where", ">(long, 0)")
     )
 
@@ -65,8 +65,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       streamTableNode(0),
-      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int",
-        "TIME_MATERIALIZATION(proctime) AS proctime")
+      term("select", "rowtime", "long", "int",
+        "PROCTIME(proctime) AS proctime")
     )
 
     util.verifyTable(result, expected)
@@ -84,7 +84,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       streamTableNode(0),
-      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
+      term("select", "rowtime"),
       term("where", ">(rowtime, 1990-12-02 12:11:11)")
     )
 
@@ -107,7 +107,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "long", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+          term("select", "long", "CAST(rowtime) AS rowtime")
         ),
         term("groupBy", "rowtime"),
         term("select", "rowtime", "COUNT(long) AS TMP_0")
@@ -134,7 +134,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long")
+          term("select", "CAST(rowtime) AS rowtime", "long")
         ),
         term("groupBy", "long"),
         term("select", "long", "MIN(rowtime) AS TMP_0")
@@ -159,16 +159,13 @@ class TimeIndicatorConversionTest extends TableTestBase {
         "DataStreamCorrelate",
         streamTableNode(0),
         term("invocation",
-          s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), 
TIME_MATERIALIZATION($$3), '')"),
+          s"${func.functionIdentifier}(CAST($$0):TIMESTAMP(3) NOT NULL, 
PROCTIME($$3), '')"),
         term("function", func),
         term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT 
long, INTEGER int, " +
           "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"),
         term("joinType", "INNER")
       ),
-      term("select",
-        "TIME_MATERIALIZATION(rowtime) AS rowtime",
-        "TIME_MATERIALIZATION(proctime) AS proctime",
-        "s")
+      term("select", "rowtime", "PROCTIME(proctime) AS proctime", "s")
     )
 
     util.verifyTable(result, expected)
@@ -219,7 +216,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         streamTableNode(0),
         term("union all", "rowtime", "long", "int")
       ),
-      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+      term("select", "rowtime")
     )
 
     util.verifyTable(result, expected)
@@ -287,7 +284,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "TIME_MATERIALIZATION(proctime) AS proctime", "long")
+          term("select", "PROCTIME(proctime) AS proctime", "long")
         ),
         term("groupBy", "proctime"),
         term("select", "proctime", "COUNT(long) AS EXPR$0")
@@ -312,7 +309,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "long", "TIME_MATERIALIZATION(proctime) AS proctime")
+          term("select", "long", "PROCTIME(proctime) AS proctime")
         ),
         term("groupBy", "long"),
         term("select", "long", "MIN(proctime) AS EXPR$0")
@@ -368,7 +365,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "long", "rowtime", "TIME_MATERIALIZATION(rowtime) AS 
$f2")
+          term("select", "long", "rowtime", "CAST(rowtime) AS rowtime0")
         ),
         term("groupBy", "long"),
         term(
@@ -377,7 +374,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
             'w$,
             'rowtime,
             100.millis)),
-        term("select", "long", "MIN($f2) AS EXPR$0")
+        term("select", "long", "MIN(rowtime0) AS EXPR$0")
       ),
       term("select", "EXPR$0", "long")
     )

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 80ff55e..04aada6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -46,12 +46,10 @@ class HarnessTestBase {
     UserDefinedFunctionUtils.serialize(new IntSumWithRetractAggFunction)
 
   protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
-    INT_TYPE_INFO,
     LONG_TYPE_INFO,
-    INT_TYPE_INFO,
     STRING_TYPE_INFO,
     LONG_TYPE_INFO),
-    Array("a", "b", "c", "d", "e"))
+    Array("rowtime", "a", "b"))
 
   protected val SumRowType = new RowTypeInfo(Array[TypeInformation[_]](
     LONG_TYPE_INFO,
@@ -103,13 +101,13 @@ class HarnessTestBase {
       |
       |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
       |      (org.apache.flink.table.functions.AggregateFunction) fmin;
-      |    output.setField(5, baseClass0.getValue(
+      |    output.setField(3, baseClass0.getValue(
       |      
(org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
       |      accs.getField(0)));
       |
       |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
       |      (org.apache.flink.table.functions.AggregateFunction) fmax;
-      |    output.setField(6, baseClass1.getValue(
+      |    output.setField(4, baseClass1.getValue(
       |      
(org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
       |      accs.getField(1)));
       |  }
@@ -121,12 +119,12 @@ class HarnessTestBase {
       |    fmin.accumulate(
       |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
       |      accs.getField(0)),
-      |      (java.lang.Long) input.getField(4));
+      |      (java.lang.Long) input.getField(2));
       |
       |    fmax.accumulate(
       |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
       |      accs.getField(1)),
-      |      (java.lang.Long) input.getField(4));
+      |      (java.lang.Long) input.getField(2));
       |  }
       |
       |  public void retract(
@@ -136,12 +134,12 @@ class HarnessTestBase {
       |    fmin.retract(
       |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
       |      accs.getField(0)),
-      |      (java.lang.Long) input.getField(4));
+      |      (java.lang.Long) input.getField(2));
       |
       |    fmax.retract(
       |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
       |      accs.getField(1)),
-      |      (java.lang.Long) input.getField(4));
+      |      (java.lang.Long) input.getField(2));
       |  }
       |
       |  public org.apache.flink.types.Row createAccumulators() {
@@ -166,12 +164,10 @@ class HarnessTestBase {
       |    output.setField(0, input.getField(0));
       |    output.setField(1, input.getField(1));
       |    output.setField(2, input.getField(2));
-      |    output.setField(3, input.getField(3));
-      |    output.setField(4, input.getField(4));
       |  }
       |
       |  public org.apache.flink.types.Row createOutputRow() {
-      |    return new org.apache.flink.types.Row(7);
+      |    return new org.apache.flink.types.Row(5);
       |  }
       |
       |/*******  This test does not use the following methods  *******/
@@ -326,7 +322,7 @@ object HarnessTestBase {
   /**
     * Return 0 for equal Rows and non zero for different rows
     */
-  class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] 
with Serializable {
+  class RowResultSortComparator() extends Comparator[Object] with Serializable 
{
 
     override def compare(o1: Object, o2: Object): Int = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index 6c24c5d..065b7bc 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -154,7 +154,7 @@ class JoinHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(2: JInt, "bbb2", 2: JInt, "Hello2"), true), 25))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }
@@ -227,7 +227,7 @@ class JoinHarnessTest extends HarnessTestBase{
     expectedOutput.add(new StreamRecord(
       CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index 04214f9..dd14d7e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -91,7 +91,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), 
true), 1))
     expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), 
true), 1))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }
@@ -150,7 +150,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
     expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), 
false), 10))
     expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), 
true), 10))
 
-    verify(expectedOutput, result, new RowResultSortComparator(0))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }

Reply via email to