This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 422ab8a928413dde21e5c4c8a4068751b7d23fb7
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri Dec 13 09:48:38 2019 +0100

    [FLINK-15168][table-planner] Fix physical indices computing.
    
    Starting from this commit we use the schema that comes from CatalogTable 
instead of schema that comes from TableSource.
    
    Computing physical indices is based on the new type hierarchy instead of 
TypeInformation.
---
 .../flink/table/catalog/ConnectorCatalogTable.java |   6 +-
 .../flink/table/catalog/DatabaseCalciteSchema.java |   2 +
 .../flink/table/plan/QueryOperationConverter.java  |   2 +
 .../table/plan/nodes/PhysicalTableSourceScan.scala |  15 +-
 .../plan/nodes/dataset/BatchTableSourceScan.scala  |  82 ++--
 .../nodes/datastream/StreamTableSourceScan.scala   |  77 ++--
 .../logical/FlinkLogicalTableSourceScan.scala      |  38 +-
 .../rules/dataSet/BatchTableSourceScanRule.scala   |   1 +
 .../datastream/StreamTableSourceScanRule.scala     |   1 +
 .../PushFilterIntoTableSourceScanRule.scala        |   2 +-
 .../PushProjectIntoTableSourceScanRule.scala       | 103 ++++-
 .../flink/table/plan/schema/TableSourceTable.scala |  66 +++-
 .../flink/table/sources/TableSourceUtil.scala      | 417 ++++-----------------
 .../table/catalog/CatalogStructureBuilder.java     |  10 +-
 14 files changed, 377 insertions(+), 445 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 1ea8b6f..2113a66 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -55,16 +55,16 @@ public class ConnectorCatalogTable<T1, T2> extends 
AbstractCatalogTable {
        // NOTES: this should be false in BLINK planner, because BLINK planner 
always uses StreamTableSource.
        private final boolean isBatch;
 
-       public static <T1> ConnectorCatalogTable source(TableSource<T1> source, 
boolean isBatch) {
+       public static <T1> ConnectorCatalogTable<T1, ?> source(TableSource<T1> 
source, boolean isBatch) {
                final TableSchema tableSchema = calculateSourceSchema(source, 
isBatch);
                return new ConnectorCatalogTable<>(source, null, tableSchema, 
isBatch);
        }
 
-       public static <T2> ConnectorCatalogTable sink(TableSink<T2> sink, 
boolean isBatch) {
+       public static <T2> ConnectorCatalogTable<?, T2> sink(TableSink<T2> 
sink, boolean isBatch) {
                return new ConnectorCatalogTable<>(null, sink, 
sink.getTableSchema(), isBatch);
        }
 
-       public static <T1, T2> ConnectorCatalogTable sourceAndSink(
+       public static <T1, T2> ConnectorCatalogTable<T1, T2> sourceAndSink(
                        TableSource<T1> source,
                        TableSink<T2> sink,
                        boolean isBatch) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 81f8579..1dd5b23 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -105,6 +105,7 @@ class DatabaseCalciteSchema implements Schema {
        private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
                Optional<TableSourceTable> tableSourceTable = 
table.getTableSource()
                        .map(tableSource -> new TableSourceTable<>(
+                               table.getSchema(),
                                tableSource,
                                !table.isBatch(),
                                FlinkStatistic.UNKNOWN()));
@@ -142,6 +143,7 @@ class DatabaseCalciteSchema implements Schema {
                }
 
                return new TableSourceTable<>(
+                       table.getSchema(),
                        tableSource,
                        // this means the TableSource extends from 
StreamTableSource, this is needed for the
                        // legacy Planner. Blink Planner should use the 
information that comes from the TableSource
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 3709dd5..2855206 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -299,6 +299,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                @Override
                public <U> RelNode visit(TableSourceQueryOperation<U> 
tableSourceTable) {
                        final Table relTable = new TableSourceTable<>(
+                               tableSourceTable.getTableSchema(),
                                tableSourceTable.getTableSource(),
                                !tableSourceTable.isBatch(),
                                FlinkStatistic.UNKNOWN());
@@ -316,6 +317,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                        
relTable.getRowType(relBuilder.getTypeFactory()),
                                        relTable,
                                        
Schemas.path(catalogReader.getRootSchema(), Collections.singletonList(refId))),
+                               tableSourceTable.getTableSchema(),
                                tableSourceTable.getTableSource(),
                                Option.empty()
                        );
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index 4aad828..0efdd14 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.plan.nodes
 
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources.TableSource
+
 import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet}
 import org.apache.calcite.rel.RelWriter
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
@@ -31,18 +31,11 @@ abstract class PhysicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
+    tableSchema: TableSchema,
     tableSource: TableSource[_],
     val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table) {
 
-  override def deriveRowType(): RelDataType = {
-    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val streamingTable = tableSource.isInstanceOf[StreamTableSource[_]]
-
-    TableSourceUtil
-      .getRelDataType(tableSource, selectedFields, streamingTable, 
flinkTypeFactory)
-  }
-
   override def explainTerms(pw: RelWriter): RelWriter = {
     val terms = super.explainTerms(pw)
         .item("fields", deriveRowType().getFieldNames.asScala.mkString(", "))
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 8e91e90..a9e73de 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -18,41 +18,54 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.table.api.internal.BatchTableEnvImpl
-import org.apache.flink.table.api.{BatchQueryConfig, TableException, Types}
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.api.{BatchQueryConfig, TableException, 
TableSchema, Types}
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.sources._
+import org.apache.flink.table.types.utils.TypeConversions
 import 
org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, 
fromLegacyInfoToDataType}
+import org.apache.flink.table.utils.TypeMappingUtils
 import org.apache.flink.types.Row
 
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util.function.{Function => JFunction}
+
+import scala.collection.JavaConverters._
+
 /** Flink RelNode to read data from an external source defined by a 
[[BatchTableSource]]. */
 class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
+    tableSchema: TableSchema,
     tableSource: TableSource[_],
     selectedFields: Option[Array[Int]])
-  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, 
selectedFields)
+  extends PhysicalTableSourceScan(
+    cluster,
+    traitSet,
+    table,
+    tableSchema,
+    tableSource,
+    selectedFields)
   with BatchScan {
 
   override def deriveRowType(): RelDataType = {
-    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    TableSourceUtil.getRelDataType(
-      tableSource,
-      selectedFields,
-      streaming = false,
-      flinkTypeFactory)
+    val baseRowType = table.getRowType
+    selectedFields.map(idxs => {
+      val fields = baseRowType.getFieldList
+      val builder = cluster.getTypeFactory.builder()
+      idxs.map(fields.get).foreach(builder.add)
+      builder.build()
+    }).getOrElse(baseRowType)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -65,6 +78,7 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
+      tableSchema,
       tableSource,
       selectedFields
     )
@@ -78,6 +92,7 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
+      tableSchema,
       tableSource,
       selectedFields
     )
@@ -87,11 +102,6 @@ class BatchTableSourceScan(
       tableEnv: BatchTableEnvImpl,
       queryConfig: BatchQueryConfig): DataSet[Row] = {
 
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      isStreamTable = false,
-      selectedFields)
-
     val config = tableEnv.getConfig
     val inputDataSet = tableSource match {
       case batchSource: BatchTableSource[_] =>
@@ -108,8 +118,6 @@ class BatchTableSourceScan(
       case _ => throw new TableException("Only BatchTableSource and 
InputFormatTableSource are " +
         "supported in BatchTableEnvironment.")
     }
-    val outputSchema = new RowSchema(this.getRowType)
-
     val inputDataType = fromLegacyInfoToDataType(inputDataSet.getType)
     val producedDataType = tableSource.getProducedDataType
 
@@ -121,18 +129,36 @@ class BatchTableSourceScan(
         s"Please validate the implementation of the TableSource.")
     }
 
-    // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+    val nameMapping: JFunction[String, String] = tableSource match {
+      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+          new JFunction[String, String] {
+            override def apply(t: String): String = 
mapping.getFieldMapping.get(t)
+          }
+      case _ => JFunction.identity()
+    }
+
+    val fieldIndexes = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
       tableSource,
-      selectedFields,
-      cluster,
-      tableEnv.getRelBuilder,
-      Types.SQL_TIMESTAMP
+      
selectedFields.map(_.map(tableSchema.getTableColumn(_).get()).toList.asJava)
+        .getOrElse(tableSchema.getTableColumns),
+      false,
+      nameMapping
     )
 
+    val rowtimeExpression = TableSourceUtil.getRowtimeAttributeDescriptor(
+        tableSource,
+        selectedFields)
+      .map(desc => TableSourceUtil.getRowtimeExtractionExpression(
+        desc.getTimestampExtractor,
+        producedDataType,
+        TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP()),
+        tableEnv.getRelBuilder,
+        nameMapping
+      ))
+
     // ingest table and convert and extract time attributes if necessary
     convertToInternalRow(
-      outputSchema,
+      new RowSchema(getRowType),
       inputDataSet,
       fieldIndexes,
       config,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index ab2ef0f..573bfff 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -18,42 +18,55 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rex.RexNode
 import org.apache.flink.streaming.api.datastream.DataStream
 import 
org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, 
AssignerWithPunctuatedWatermarks}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{StreamQueryConfig, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.api.{StreamQueryConfig, TableException, 
TableSchema}
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, 
PreserveWatermarks, PunctuatedWatermarkAssigner}
+import org.apache.flink.table.types.utils.TypeConversions
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.utils.TypeMappingUtils
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+
+import java.util.function.{Function => JFunction}
+
+import scala.collection.JavaConverters._
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
+    tableSchema: TableSchema,
     tableSource: StreamTableSource[_],
     selectedFields: Option[Array[Int]])
-  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, 
selectedFields)
+  extends PhysicalTableSourceScan(
+    cluster,
+    traitSet,
+    table,
+    tableSchema,
+    tableSource,
+    selectedFields)
   with StreamScan {
 
   override def deriveRowType(): RelDataType = {
-    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    TableSourceUtil.getRelDataType(
-      tableSource,
-      selectedFields,
-      streaming = true,
-      flinkTypeFactory)
+    val baseRowType = table.getRowType
+    selectedFields.map(idxs => {
+      val fields = baseRowType.getFieldList
+      val builder = cluster.getTypeFactory.builder()
+      idxs.map(fields.get).foreach(builder.add)
+      builder.build()
+    }).getOrElse(baseRowType)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -66,6 +79,7 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
+      tableSchema,
       tableSource,
       selectedFields
     )
@@ -79,6 +93,7 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
+      tableSchema,
       newTableSource.asInstanceOf[StreamTableSource[_]],
       selectedFields
     )
@@ -88,11 +103,6 @@ class StreamTableSourceScan(
       planner: StreamPlanner,
       queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val fieldIndexes = TableSourceUtil.computeIndexMapping(
-      tableSource,
-      isStreamTable = true,
-      selectedFields)
-
     val config = planner.getConfig
     val inputDataStream = 
tableSource.getDataStream(planner.getExecutionEnvironment)
       .asInstanceOf[DataStream[Any]]
@@ -109,13 +119,32 @@ class StreamTableSourceScan(
         s"Please validate the implementation of the TableSource.")
     }
 
+    val nameMapping: JFunction[String, String] = tableSource match {
+      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+        new JFunction[String, String] {
+          override def apply(t: String): String = 
mapping.getFieldMapping.get(t)
+        }
+      case _ => JFunction.identity()
+    }
+
     // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+    val rowtimeExpression = TableSourceUtil.getRowtimeAttributeDescriptor(
       tableSource,
-      selectedFields,
-      cluster,
-      planner.getRelBuilder,
-      TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      selectedFields)
+      .map(desc => TableSourceUtil.getRowtimeExtractionExpression(
+        desc.getTimestampExtractor,
+        producedDataType,
+        
TypeConversions.fromLegacyInfoToDataType(TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
+        planner.getRelBuilder,
+        nameMapping
+      ))
+
+    val fieldIndexes = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+      tableSource,
+      
selectedFields.map(_.map(tableSchema.getTableColumn(_).get()).toList.asJava)
+        .getOrElse(tableSchema.getTableColumns),
+      true,
+      nameMapping
     )
 
     // ingest table and convert and extract time attributes if necessary
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index fb0a09c..d444816 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -18,6 +18,13 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.{FilterableTableSource, TableSource}
+import org.apache.flink.table.types.utils.{DataTypeUtils, TypeConversions}
+
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
@@ -25,10 +32,6 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{FilterableTableSource, TableSource, 
TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
@@ -36,6 +39,7 @@ class FlinkLogicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
+    val tableSchema: TableSchema,
     val tableSource: TableSource[_],
     val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table)
@@ -43,16 +47,26 @@ class FlinkLogicalTableSourceScan(
 
   def copy(
       traitSet: RelTraitSet,
+      tableSchema: TableSchema,
       tableSource: TableSource[_],
       selectedFields: Option[Array[Int]]): FlinkLogicalTableSourceScan = {
-    new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource, 
selectedFields)
+    new FlinkLogicalTableSourceScan(
+      cluster,
+      traitSet,
+      getTable,
+      tableSchema,
+      tableSource,
+      selectedFields)
   }
 
   override def deriveRowType(): RelDataType = {
-    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val streamingTable = 
table.unwrap(classOf[TableSourceTable[_]]).isStreamingMode
-
-    TableSourceUtil.getRelDataType(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
+    val baseRowType = table.getRowType
+    selectedFields.map(idxs => {
+      val fields = baseRowType.getFieldList
+      val builder = cluster.getTypeFactory.builder()
+      idxs.map(fields.get).foreach(builder.add)
+      builder.build()
+    }).getOrElse(baseRowType)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -74,7 +88,7 @@ class FlinkLogicalTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val terms = super.explainTerms(pw)
-        .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", 
"))
+        .item("fields", tableSchema.getFieldNames.mkString(", "))
 
     val sourceDesc = tableSource.explainSource()
     if (sourceDesc.nonEmpty) {
@@ -113,11 +127,13 @@ class FlinkLogicalTableSourceScanConverter
     val scan = rel.asInstanceOf[TableScan]
     val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
 
+    val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
     new FlinkLogicalTableSourceScan(
       rel.getCluster,
       traitSet,
       scan.getTable,
-      scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource,
+      tableSourceTable.tableSchema,
+      tableSourceTable.tableSource,
       None
     )
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index e45ad85..8942835 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -50,6 +50,7 @@ class BatchTableSourceScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
+      scan.tableSchema,
       scan.tableSource,
       scan.selectedFields
     )
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 1ddc1c3..959c893 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -52,6 +52,7 @@ class StreamTableSourceScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
+      scan.tableSchema,
       scan.tableSource.asInstanceOf[StreamTableSource[_]],
       scan.selectedFields
     )
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 44b100f..a8fbd78 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -113,7 +113,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
 
     // check whether we still need a RexProgram. An RexProgram is needed when 
either
     // projection or filter exists.
-    val newScan = scan.copy(scan.getTraitSet, newTableSource, 
scan.selectedFields)
+    val newScan = scan.copy(scan.getTraitSet, scan.tableSchema, 
newTableSource, scan.selectedFields)
     val newRexProgram = {
       if (remainingCondition != null || !program.projectsOnlyIdentity) {
         val expandedProjectList = program.getProjectList.asScala
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 9864cd0..932b745 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -18,12 +18,22 @@
 
 package org.apache.flink.table.plan.rules.logical
 
-import org.apache.calcite.plan.RelOptRule.{none, operand}
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.util.{RexProgramExtractor, 
RexProgramRewriter}
+import org.apache.flink.table.api.{TableException, TableSchema}
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.plan.util.{RexProgramExtractor, 
RexProgramRewriter}
+import 
org.apache.flink.table.sources.TableSourceUtil.getRowtimeAttributeDescriptor
 import org.apache.flink.table.sources._
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
+import org.apache.flink.table.types.utils.DataTypeUtils
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.utils.TypeMappingUtils
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+
+import java.util.function.{Function => JFunction}
+
+import scala.collection.JavaConverters._
 
 class PushProjectIntoTableSourceScanRule extends RelOptRule(
   operand(classOf[FlinkLogicalCalc],
@@ -43,7 +53,29 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
     val source = scan.tableSource
 
     val accessedLogicalFields = 
RexProgramExtractor.extractRefInputFields(calc.getProgram)
-    val accessedPhysicalFields = TableSourceUtil.getPhysicalIndexes(source, 
accessedLogicalFields)
+
+    val nameMapping: JFunction[String, String] = source match {
+      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+        new JFunction[String, String] {
+          override def apply(t: String): String = 
mapping.getFieldMapping.get(t)
+        }
+      case _ => JFunction.identity()
+    }
+
+    val accessedIndices = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+      source,
+      
accessedLogicalFields.map(scan.tableSchema.getTableColumn(_).get()).toList.asJava,
+      source.isInstanceOf[StreamTableSource[_]],
+      nameMapping
+    )
+    val physicalFields = expandTimeAttributes(
+      source,
+      if 
(LogicalTypeChecks.isCompositeType(source.getProducedDataType.getLogicalType)) {
+        DataTypeUtils.expandCompositeTypeToSchema(source.getProducedDataType)
+      } else {
+        null
+      },
+      accessedIndices)
 
     // only continue if fields are projected or reordered.
     // eager reordering can remove a calc operator.
@@ -53,10 +85,10 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
       val (newTableSource, isProjectSuccess) = source match {
         case nested: NestedFieldsProjectableTableSource[_] =>
           val nestedFields = RexProgramExtractor
-            .extractRefNestedInputFields(calc.getProgram, 
accessedPhysicalFields)
-          (nested.projectNestedFields(accessedPhysicalFields, nestedFields), 
true)
+            .extractRefNestedInputFields(calc.getProgram, physicalFields)
+          (nested.projectNestedFields(physicalFields, nestedFields), true)
         case projecting: ProjectableTableSource[_] =>
-          (projecting.projectFields(accessedPhysicalFields), true)
+          (projecting.projectFields(physicalFields), true)
         case nonProjecting: TableSource[_] =>
           // projection cannot be pushed to TableSource
           (nonProjecting, false)
@@ -77,7 +109,11 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
       }
 
       // Apply the projection during the input conversion of the scan.
-      val newScan = scan.copy(scan.getTraitSet, newTableSource, 
Some(accessedLogicalFields))
+      val newScan = scan.copy(
+        scan.getTraitSet,
+        scan.tableSchema,
+        newTableSource,
+        Some(accessedLogicalFields))
       val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
         calc.getProgram,
         newScan.getRowType,
@@ -93,6 +129,55 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
       }
     }
   }
+
+  private def expandTimeAttributes(
+      tableSource: TableSource[_],
+      physicalSchema: TableSchema,
+      physicalIndices: Array[Int]): Array[Int] = {
+
+      physicalIndices
+      // resolve time indicator markers to physical indexes
+      .flatMap {
+        case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
+          // proctime field do not access a physical field
+          Seq()
+        case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
+          // rowtime field is computed.
+          // get names of fields which are accessed by the expression to 
compute the rowtime field.
+          val rowtimeAttributeDescriptor = 
getRowtimeAttributeDescriptor(tableSource, None)
+          val accessedFields = if (rowtimeAttributeDescriptor.isDefined) {
+            
rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields
+          } else {
+            throw new TableException("Computed field mapping includes a 
rowtime marker but the " +
+              "TableSource does not provide a RowtimeAttributeDescriptor. " +
+              "This is a bug and should be reported.")
+          }
+          // resolve field names to physical fields
+          accessedFields.map(name => {
+            if (physicalSchema != null) {
+              mapToCompositeType(tableSource, physicalSchema, name)
+            } else {
+              0
+            }
+          })
+        case idx =>
+          Seq(idx)
+      }
+  }
+
+  private def mapToCompositeType(
+      tableSource: TableSource[_],
+      physicalSchema: TableSchema,
+      name: String)
+    : Int = {
+    val fieldName = tableSource match {
+      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+        mapping.getFieldMapping.get(name)
+      case _ =>
+        name
+    }
+    physicalSchema.getFieldNames.indexWhere(_.equals(fieldName)) // logical 
accessed field
+  }
 }
 
 object PushProjectIntoTableSourceScanRule {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index a134968..eedc105 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -18,12 +18,21 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.{DefinedFieldMapping, TableSource, 
TableSourceValidation}
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
+import org.apache.flink.table.types.utils.{DataTypeUtils, TypeConversions}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.utils.TypeMappingUtils
+
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.Statistic
 import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{TableSource, TableSourceUtil, 
TableSourceValidation}
+
+import java.util.function.{Function => JFunction}
 
 /**
   * Abstract class which define the interfaces required to convert a 
[[TableSource]] to
@@ -34,12 +43,13 @@ import org.apache.flink.table.sources.{TableSource, 
TableSourceUtil, TableSource
   * @param statistic The table statistics.
   */
 class TableSourceTable[T](
+    val tableSchema: TableSchema,
     val tableSource: TableSource[T],
     val isStreamingMode: Boolean,
     val statistic: FlinkStatistic)
   extends AbstractTable {
 
-  TableSourceValidation.validateTableSource(tableSource, 
tableSource.getTableSchema)
+  TableSourceValidation.validateTableSource(tableSource, tableSchema)
 
   /**
     * Returns statistics of current table
@@ -48,11 +58,53 @@ class TableSourceTable[T](
     */
   override def getStatistic: Statistic = statistic
 
+  // We must enrich logical schema from catalog table with physical type 
coming from table source.
+  // Schema coming from catalog table might not have proper conversion 
classes. Those must be
+  // extracted from produced type, before converting to RelDataType
   def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    TableSourceUtil.getRelDataType(
+
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val fieldNames = tableSchema.getFieldNames
+
+    val nameMapping: JFunction[String, String] = tableSource match {
+      case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
+        new JFunction[String, String] {
+          override def apply(t: String): String = 
mapping.getFieldMapping.get(t)
+        }
+      case _ => JFunction.identity()
+    }
+
+    val producedDataType = tableSource.getProducedDataType
+    val fieldIndexes = 
TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
       tableSource,
-      None,
+      tableSchema.getTableColumns,
       isStreamingMode,
-      typeFactory.asInstanceOf[FlinkTypeFactory])
+      nameMapping
+    )
+
+    val typeInfos = if 
(LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
+      val physicalSchema = 
DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
+      fieldIndexes.map(mapIndex(_,
+        idx =>
+          
TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
+      )
+    } else {
+      fieldIndexes.map(mapIndex(_, _ => 
TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
+    }
+
+    flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
+  }
+
+  def mapIndex(idx: Int, mapNonMarker: Int => TypeInformation[_]): 
TypeInformation[_] = {
+    idx match {
+      case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => Types.SQL_TIMESTAMP()
+      case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER => Types.SQL_TIMESTAMP()
+      case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
+        TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+      case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => 
TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+      case _ =>
+       mapNonMarker(idx)
+    }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index f91f2fa..61e5957 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -18,165 +18,26 @@
 
 package org.apache.flink.table.sources
 
-import java.sql.Timestamp
-import com.google.common.collect.ImmutableList
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{DataTypes, Types, ValidationException}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall}
+import org.apache.flink.table.expressions.{Expression, 
PlannerExpressionConverter, ResolvedFieldReference}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST
+import org.apache.flink.table.sources.tsextractors.{TimestampExtractor, 
TimestampExtractorUtils}
+import org.apache.flink.table.types.DataType
+
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{DataTypes, TableException, Types, 
ValidationException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall}
-import org.apache.flink.table.expressions.{PlannerExpressionConverter, 
ResolvedFieldReference}
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST
-import 
org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, 
fromLegacyInfoToDataType}
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.calcite.rex.RexNode
 
-import scala.collection.JavaConverters._
+import java.util.function.{Function => JFunction}
 
 /** Util class for [[TableSource]]. */
 object TableSourceUtil {
 
-  /** Returns true if the [[TableSource]] has a rowtime attribute. */
-  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
-    getRowtimeAttributes(tableSource).nonEmpty
-
-  /** Returns true if the [[TableSource]] has a proctime attribute. */
-  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
-    getProctimeAttribute(tableSource).nonEmpty
-
-  /**
-    * Computes the indices that map the input type of the DataStream to the 
schema of the table.
-    *
-    * The mapping is based on the field names and fails if a table field 
cannot be
-    * mapped to a field of the input type.
-    *
-    * @param tableSource The table source for which the table schema is mapped 
to the input type.
-    * @param isStreamTable True if the mapping is computed for a streaming 
table, false otherwise.
-    * @param selectedFields The indexes of the table schema fields for which a 
mapping is
-    *                       computed. If None, a mapping for all fields is 
computed.
-    * @return An index mapping from input type to table schema.
-    */
-  def computeIndexMapping(
-      tableSource: TableSource[_],
-      isStreamTable: Boolean,
-      selectedFields: Option[Array[Int]]): Array[Int] = {
-    val inputType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType)
-    val tableSchema = tableSource.getTableSchema
-
-    // get names of selected fields
-    val tableFieldNames = if (selectedFields.isDefined) {
-      val names = tableSchema.getFieldNames
-      selectedFields.get.map(names(_))
-    } else {
-      tableSchema.getFieldNames
-    }
-
-    // get types of selected fields
-    val tableFieldTypes = if (selectedFields.isDefined) {
-      val types = tableSchema.getFieldTypes
-      selectedFields.get.map(types(_))
-    } else {
-      tableSchema.getFieldTypes
-    }
-
-    // get rowtime and proctime attributes
-    val rowtimeAttributes = getRowtimeAttributes(tableSource)
-    val proctimeAttributes = getProctimeAttribute(tableSource)
-
-    // compute mapping of selected fields and time attributes
-    val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map {
-      case (t: SqlTimeTypeInfo[_], name: String)
-        if t.getTypeClass == classOf[Timestamp] && 
proctimeAttributes.contains(name) =>
-        if (isStreamTable) {
-          TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER
-        } else {
-          TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER
-        }
-      case (t: SqlTimeTypeInfo[_], name: String)
-        if t.getTypeClass == classOf[Timestamp] && 
rowtimeAttributes.contains(name) =>
-        if (isStreamTable) {
-          TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER
-        } else {
-          TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER
-        }
-      case (t: TypeInformation[_], name) =>
-        // check if field is registered as time indicator
-        if (getProctimeAttribute(tableSource).contains(name)) {
-          throw new ValidationException(s"Processing time field '$name' has 
invalid type $t. " +
-            s"Processing time attributes must be of type 
${Types.SQL_TIMESTAMP}.")
-        }
-        if (getRowtimeAttributes(tableSource).contains(name)) {
-          throw new ValidationException(s"Rowtime field '$name' has invalid 
type $t. " +
-            s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
-        }
-
-        val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
-        // validate that mapped fields are are same type
-        if (tpe != t) {
-          throw new ValidationException(s"Type $t of table field '$name' does 
not " +
-            s"match with type $tpe of the field '$physicalName' of the 
TableSource return type.")
-        }
-        idx
-    }
-
-    // ensure that only one field is mapped to an atomic type
-    if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 
1) {
-      throw new ValidationException(
-        s"More than one table field matched to atomic input type $inputType.")
-    }
-
-    mapping
-  }
-
-  /**
-    * Returns the Calcite schema of a [[TableSource]].
-    *
-    * @param tableSource The [[TableSource]] for which the Calcite schema is 
generated.
-    * @param selectedFields The indices of all selected fields. None, if all 
fields are selected.
-    * @param streaming Flag to determine whether the schema of a stream or 
batch table is created.
-    * @param typeFactory The type factory to create the schema.
-    * @return The Calcite schema for the selected fields of the given 
[[TableSource]].
-    */
-  def getRelDataType(
-      tableSource: TableSource[_],
-      selectedFields: Option[Array[Int]],
-      streaming: Boolean,
-      typeFactory: FlinkTypeFactory): RelDataType = {
-
-    val fieldNames = tableSource.getTableSchema.getFieldNames
-    var fieldTypes = tableSource.getTableSchema.getFieldTypes
-
-    if (streaming) {
-      // adjust the type of time attributes for streaming tables
-      val rowtimeAttributes = getRowtimeAttributes(tableSource)
-      val proctimeAttributes = getProctimeAttribute(tableSource)
-
-      // patch rowtime fields with time indicator type
-      rowtimeAttributes.foreach { rowtimeField =>
-        val idx = fieldNames.indexOf(rowtimeField)
-        fieldTypes = fieldTypes.patch(idx, 
Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1)
-      }
-      // patch proctime field with time indicator type
-      proctimeAttributes.foreach { proctimeField =>
-        val idx = fieldNames.indexOf(proctimeField)
-        fieldTypes = fieldTypes.patch(idx, 
Seq(TimeIndicatorTypeInfo.PROCTIME_INDICATOR), 1)
-      }
-    }
-
-    val (selectedFieldNames, selectedFieldTypes) = if 
(selectedFields.isDefined) {
-      // filter field names and types by selected fields
-      (selectedFields.get.map(fieldNames(_)), 
selectedFields.get.map(fieldTypes(_)))
-    } else {
-      (fieldNames, fieldTypes)
-    }
-    typeFactory.buildLogicalRowType(selectedFieldNames, selectedFieldTypes)
-  }
-
   /**
     * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
     *
@@ -220,207 +81,71 @@ object TableSourceUtil {
   }
 
   /**
-    * Obtains the [[RexNode]] expression to extract the rowtime timestamp for 
a [[TableSource]].
-    *
-    * @param tableSource The [[TableSource]] for which the expression is 
extracted.
-    * @param selectedFields The selected fields of the [[TableSource]].
-    *                       If None, all fields are selected.
-    * @param cluster The [[RelOptCluster]] of the current optimization process.
-    * @param relBuilder The [[RelBuilder]] to build the [[RexNode]].
-    * @param resultType The result type of the timestamp expression.
-    * @return The [[RexNode]] expression to extract the timestamp of the table 
source.
-    */
+   * Retrieves an expression to compute a rowtime attribute.
+   *
+   * @param extractor Timestamp extractor to construct an expression for.
+   * @param physicalInputType Physical input type that the timestamp extractor 
accesses.
+   * @param expectedDataType Expected type of the expression. Different in 
case of batch and
+   *                         streaming.
+   * @param relBuilder  Builder needed to construct the resulting RexNode.
+   * @param nameMapping Additional remapping of a logical to a physical field 
name.
+   *                    TimestampExtractor works with logical names, but 
accesses physical
+   *                    fields
+   * @return The [[RexNode]] expression to extract the timestamp.
+   */
   def getRowtimeExtractionExpression(
-      tableSource: TableSource[_],
-      selectedFields: Option[Array[Int]],
-      cluster: RelOptCluster,
-      relBuilder: RelBuilder,
-      resultType: TypeInformation[_]): Option[RexNode] = {
-
-    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      extractor: TimestampExtractor,
+      physicalInputType: DataType,
+      expectedDataType: DataType,
+      relBuilder: FlinkRelBuilder,
+      nameMapping: JFunction[String, String])
+    : RexNode = {
+    val accessedFields = TimestampExtractorUtils.getAccessedFields(
+      extractor,
+      physicalInputType,
+      nameMapping)
+
+    relBuilder.push(createSchemaRelNode(accessedFields, relBuilder.getCluster))
+    val expr = constructExpression(
+      expectedDataType,
+      extractor,
+      accessedFields
+    ).accept(PlannerExpressionConverter.INSTANCE)
+      .toRexNode(relBuilder)
+    relBuilder.clear()
+    expr
+  }
 
-    /**
-      * Creates a RelNode with a schema that corresponds on the given fields
-      * Fields for which no information is available, will have default values.
-      */
-    def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): 
RelNode = {
-      val maxIdx = fields.map(_._2).max
-      val idxMap: Map[Int, (String, TypeInformation[_])] = Map(
-        fields.map(f => f._2 -> (f._1, f._3)): _*)
-      val (physicalFields, physicalTypes) = (0 to maxIdx)
-        .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip
-      val physicalSchema: RelDataType = typeFactory.buildLogicalRowType(
+  private def createSchemaRelNode(
+      fields: Array[ResolvedFieldReference],
+      cluster: RelOptCluster)
+    : RelNode = {
+    val maxIdx = fields.map(_.fieldIndex()).max
+    val idxMap: Map[Int, (String, TypeInformation[_])] = Map(
+      fields.map(f => f.fieldIndex() -> (f.name(), f.resultType())): _*)
+    val (physicalFields, physicalTypes) = (0 to maxIdx)
+      .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip
+    val physicalSchema: RelDataType = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+      .buildLogicalRowType(
         physicalFields,
         physicalTypes)
-      LogicalValues.create(
-        cluster,
-        physicalSchema,
-        
ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]])
-    }
-
-    val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
-    rowtimeDesc.map { r =>
-      val tsExtractor = r.getTimestampExtractor
-
-      val fieldAccesses = if (tsExtractor.getArgumentFields.nonEmpty) {
-        val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, 
tableSource)
-        // push an empty values node with the physical schema on the relbuilder
-        relBuilder.push(createSchemaRelNode(resolvedFields))
-        // get extraction expression
-        resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
-      } else {
-        new Array[ResolvedFieldReference](0)
-      }
-
-      val expression = tsExtractor.getExpression(fieldAccesses)
-      // add cast to requested type and convert expression to RexNode
-      // If resultType is TimeIndicatorTypeInfo, its internal format is long, 
but cast
-      // from Timestamp is java.sql.Timestamp. So we need cast to long first.
-      val castExpression = unresolvedCall(CAST,
-        unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT())),
-        typeLiteral(fromLegacyInfoToDataType(resultType)))
-
-      // TODO we convert to planner expressions as a temporary solution
-      val rexExpression = castExpression
-          .accept(PlannerExpressionConverter.INSTANCE)
-          .toRexNode(relBuilder)
-      relBuilder.clear()
-      rexExpression
-    }
-  }
-
-  /**
-    * Returns the indexes of the physical fields that required to compute the 
given logical fields.
-    *
-    * @param tableSource The [[TableSource]] for which the physical indexes 
are computed.
-    * @param logicalFieldIndexes The indexes of the accessed logical fields 
for which the physical
-    *                            indexes are computed.
-    * @return The indexes of the physical fields are accessed to forward and 
compute the logical
-    *         fields.
-    */
-  def getPhysicalIndexes(
-      tableSource: TableSource[_],
-      logicalFieldIndexes: Array[Int]): Array[Int] = {
-
-    // get the mapping from logical to physical positions.
-    // stream / batch distinction not important here
-    val fieldMapping = computeIndexMapping(tableSource, isStreamTable = true, 
None)
-
-    logicalFieldIndexes
-      // resolve logical indexes to physical indexes
-      .map(fieldMapping(_))
-      // resolve time indicator markers to physical indexes
-      .flatMap {
-      case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
-        // proctime field do not access a physical field
-        Seq()
-      case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
-        // rowtime field is computed.
-        // get names of fields which are accessed by the expression to compute 
the rowtime field.
-        val rowtimeAttributeDescriptor = 
getRowtimeAttributeDescriptor(tableSource, None)
-        val accessedFields = if (rowtimeAttributeDescriptor.isDefined) {
-          
rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields
-        } else {
-          throw new TableException("Computed field mapping includes a rowtime 
marker but the " +
-            "TableSource does not provide a RowtimeAttributeDescriptor. " +
-            "This is a bug and should be reported.")
-        }
-        // resolve field names to physical fields
-        resolveInputFields(accessedFields, tableSource).map(_._2)
-      case idx =>
-        Seq(idx)
-    }
+    LogicalValues.createEmpty(
+      cluster,
+      physicalSchema)
   }
 
-  /** Returns a list with all rowtime attribute names of the [[TableSource]]. 
*/
-  private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
= {
-    tableSource match {
-      case r: DefinedRowtimeAttributes =>
-        
r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
-      case _ =>
-        Array()
-    }
+  private def constructExpression(
+      expectedType: DataType,
+      timestampExtractor: TimestampExtractor,
+      fieldAccesses: Array[ResolvedFieldReference])
+    : Expression = {
+    val expression = timestampExtractor.getExpression(fieldAccesses)
+    // add cast to requested type and convert expression to RexNode
+    // If resultType is TimeIndicatorTypeInfo, its internal format is long, 
but cast
+    // from Timestamp is java.sql.Timestamp. So we need cast to long first.
+    unresolvedCall(
+      CAST,
+      unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT)),
+      typeLiteral(expectedType))
   }
-
-  /** Returns the proctime attribute of the [[TableSource]] if it is defined. 
*/
-  private def getProctimeAttribute(tableSource: TableSource[_]): 
Option[String] = {
-    tableSource match {
-      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null =>
-        Some(p.getProctimeAttribute)
-      case _ =>
-        None
-    }
-  }
-
-  /**
-    * Identifies for a field name of the logical schema, the corresponding 
physical field in the
-    * return type of a [[TableSource]].
-    *
-    * @param fieldName The logical field to look up.
-    * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and type information of the physical field.
-    */
-  private def resolveInputField(
-      fieldName: String,
-      tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
-
-    val returnType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType)
-
-    /** Look up a field by name in a type information */
-    def lookupField(fieldName: String, failMsg: String): (String, Int, 
TypeInformation[_]) = {
-      returnType match {
-
-        case c: CompositeType[_] =>
-          // get and check field index
-          val idx = c.getFieldIndex(fieldName)
-          if (idx < 0) {
-            throw new ValidationException(failMsg)
-          }
-          // return field name, index, and field type
-          (fieldName, idx, c.getTypeAt(idx))
-
-        case t: TypeInformation[_] =>
-          // no composite type, we return the full atomic type as field
-          (fieldName, 0, t)
-      }
-    }
-
-    tableSource match {
-      case d: DefinedFieldMapping if d.getFieldMapping != null =>
-        // resolve field name in field mapping
-        val resolvedFieldName = d.getFieldMapping.get(fieldName)
-        if (resolvedFieldName == null) {
-          throw new ValidationException(
-            s"Field '$fieldName' could not be resolved by the field mapping.")
-        }
-        // look up resolved field in return type
-        lookupField(
-          resolvedFieldName,
-          s"Table field '$fieldName' was resolved to TableSource return type 
field " +
-            s"'$resolvedFieldName', but field '$resolvedFieldName' was not 
found in the return " +
-            s"type $returnType of the TableSource. " +
-            s"Please verify the field mapping of the TableSource.")
-      case _ =>
-        // look up field in return type
-        lookupField(
-          fieldName,
-          s"Table field '$fieldName' was not found in the return type 
$returnType of the " +
-            s"TableSource.")
-    }
-  }
-
-  /**
-    * Identifies the physical fields in the return type [[TypeInformation]] of 
a [[TableSource]]
-    * for a list of field names of the [[TableSource]]'s 
[[org.apache.flink.table.api.TableSchema]].
-    *
-    * @param fieldNames The field names to look up.
-    * @param tableSource The table source in which to look for the field.
-    * @return The name, index, and type information of the physical field.
-    */
-  private def resolveInputFields(
-      fieldNames: Array[String],
-      tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = 
{
-    fieldNames.map(resolveInputField(_, tableSource))
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java
index 94dcac5..cbdfcc0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import java.util.Collections;
@@ -270,20 +270,20 @@ public class CatalogStructureBuilder {
                                }
 
                                @Override
-                               public TypeInformation<Row> getReturnType() {
-                                       return tableSchema.toRowType();
+                               public DataType getProducedDataType() {
+                                       return tableSchema.toRowDataType();
                                }
 
                                @Override
                                public TableSchema getTableSchema() {
-                                       return tableSchema;
+                                       throw new 
UnsupportedOperationException("Should not be called");
                                }
 
                                @Override
                                public String explainSource() {
                                        return 
String.format("isTemporary=[%s]", isTemporary);
                                }
-                       }, null, TableSchema.builder().build(), false);
+                       }, null, tableSchema, false);
 
                        this.fullyQualifiedPath = fullyQualifiedPath;
                        this.isTemporary = isTemporary;

Reply via email to