This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 422ab8a928413dde21e5c4c8a4068751b7d23fb7 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Dec 13 09:48:38 2019 +0100 [FLINK-15168][table-planner] Fix physical indices computing. Starting from this commit we use the schema that comes from CatalogTable instead of schema that comes from TableSource. Computing physical indices is based on the new type hierarchy instead of TypeInformation. --- .../flink/table/catalog/ConnectorCatalogTable.java | 6 +- .../flink/table/catalog/DatabaseCalciteSchema.java | 2 + .../flink/table/plan/QueryOperationConverter.java | 2 + .../table/plan/nodes/PhysicalTableSourceScan.scala | 15 +- .../plan/nodes/dataset/BatchTableSourceScan.scala | 82 ++-- .../nodes/datastream/StreamTableSourceScan.scala | 77 ++-- .../logical/FlinkLogicalTableSourceScan.scala | 38 +- .../rules/dataSet/BatchTableSourceScanRule.scala | 1 + .../datastream/StreamTableSourceScanRule.scala | 1 + .../PushFilterIntoTableSourceScanRule.scala | 2 +- .../PushProjectIntoTableSourceScanRule.scala | 103 ++++- .../flink/table/plan/schema/TableSourceTable.scala | 66 +++- .../flink/table/sources/TableSourceUtil.scala | 417 ++++----------------- .../table/catalog/CatalogStructureBuilder.java | 10 +- 14 files changed, 377 insertions(+), 445 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java index 1ea8b6f..2113a66 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java @@ -55,16 +55,16 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable { // NOTES: this should be false in BLINK planner, because BLINK planner always uses StreamTableSource. private final boolean isBatch; - public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) { + public static <T1> ConnectorCatalogTable<T1, ?> source(TableSource<T1> source, boolean isBatch) { final TableSchema tableSchema = calculateSourceSchema(source, isBatch); return new ConnectorCatalogTable<>(source, null, tableSchema, isBatch); } - public static <T2> ConnectorCatalogTable sink(TableSink<T2> sink, boolean isBatch) { + public static <T2> ConnectorCatalogTable<?, T2> sink(TableSink<T2> sink, boolean isBatch) { return new ConnectorCatalogTable<>(null, sink, sink.getTableSchema(), isBatch); } - public static <T1, T2> ConnectorCatalogTable sourceAndSink( + public static <T1, T2> ConnectorCatalogTable<T1, T2> sourceAndSink( TableSource<T1> source, TableSink<T2> sink, boolean isBatch) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java index 81f8579..1dd5b23 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java @@ -105,6 +105,7 @@ class DatabaseCalciteSchema implements Schema { private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) { Optional<TableSourceTable> tableSourceTable = table.getTableSource() .map(tableSource -> new TableSourceTable<>( + table.getSchema(), tableSource, !table.isBatch(), FlinkStatistic.UNKNOWN())); @@ -142,6 +143,7 @@ class DatabaseCalciteSchema implements Schema { } return new TableSourceTable<>( + table.getSchema(), tableSource, // this means the TableSource extends from StreamTableSource, this is needed for the // legacy Planner. Blink Planner should use the information that comes from the TableSource diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java index 3709dd5..2855206 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java @@ -299,6 +299,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod @Override public <U> RelNode visit(TableSourceQueryOperation<U> tableSourceTable) { final Table relTable = new TableSourceTable<>( + tableSourceTable.getTableSchema(), tableSourceTable.getTableSource(), !tableSourceTable.isBatch(), FlinkStatistic.UNKNOWN()); @@ -316,6 +317,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod relTable.getRowType(relBuilder.getTypeFactory()), relTable, Schemas.path(catalogReader.getRootSchema(), Collections.singletonList(refId))), + tableSourceTable.getTableSchema(), tableSourceTable.getTableSource(), Option.empty() ); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala index 4aad828..0efdd14 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala @@ -18,12 +18,12 @@ package org.apache.flink.table.plan.nodes +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.sources.TableSource + import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} import org.apache.calcite.rel.RelWriter -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil} import scala.collection.JavaConverters._ @@ -31,18 +31,11 @@ abstract class PhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, + tableSchema: TableSchema, tableSource: TableSource[_], val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) { - override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - val streamingTable = tableSource.isInstanceOf[StreamTableSource[_]] - - TableSourceUtil - .getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory) - } - override def explainTerms(pw: RelWriter): RelWriter = { val terms = super.explainTerms(pw) .item("fields", deriveRowType().getFieldNames.asScala.mkString(", ")) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 8e91e90..a9e73de 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -18,41 +18,54 @@ package org.apache.flink.table.plan.nodes.dataset -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rex.RexNode import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.core.io.InputSplit import org.apache.flink.table.api.internal.BatchTableEnvImpl -import org.apache.flink.table.api.{BatchQueryConfig, TableException, Types} -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.{BatchQueryConfig, TableException, TableSchema, Types} import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.sources._ +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType} +import org.apache.flink.table.utils.TypeMappingUtils import org.apache.flink.types.Row +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util.function.{Function => JFunction} + +import scala.collection.JavaConverters._ + /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, + tableSchema: TableSchema, tableSource: TableSource[_], selectedFields: Option[Array[Int]]) - extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, selectedFields) + extends PhysicalTableSourceScan( + cluster, + traitSet, + table, + tableSchema, + tableSource, + selectedFields) with BatchScan { override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - TableSourceUtil.getRelDataType( - tableSource, - selectedFields, - streaming = false, - flinkTypeFactory) + val baseRowType = table.getRowType + selectedFields.map(idxs => { + val fields = baseRowType.getFieldList + val builder = cluster.getTypeFactory.builder() + idxs.map(fields.get).foreach(builder.add) + builder.build() + }).getOrElse(baseRowType) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -65,6 +78,7 @@ class BatchTableSourceScan( cluster, traitSet, getTable, + tableSchema, tableSource, selectedFields ) @@ -78,6 +92,7 @@ class BatchTableSourceScan( cluster, traitSet, getTable, + tableSchema, tableSource, selectedFields ) @@ -87,11 +102,6 @@ class BatchTableSourceScan( tableEnv: BatchTableEnvImpl, queryConfig: BatchQueryConfig): DataSet[Row] = { - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - isStreamTable = false, - selectedFields) - val config = tableEnv.getConfig val inputDataSet = tableSource match { case batchSource: BatchTableSource[_] => @@ -108,8 +118,6 @@ class BatchTableSourceScan( case _ => throw new TableException("Only BatchTableSource and InputFormatTableSource are " + "supported in BatchTableEnvironment.") } - val outputSchema = new RowSchema(this.getRowType) - val inputDataType = fromLegacyInfoToDataType(inputDataSet.getType) val producedDataType = tableSource.getProducedDataType @@ -121,18 +129,36 @@ class BatchTableSourceScan( s"Please validate the implementation of the TableSource.") } - // get expression to extract rowtime attribute - val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + val nameMapping: JFunction[String, String] = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } + + val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, - selectedFields, - cluster, - tableEnv.getRelBuilder, - Types.SQL_TIMESTAMP + selectedFields.map(_.map(tableSchema.getTableColumn(_).get()).toList.asJava) + .getOrElse(tableSchema.getTableColumns), + false, + nameMapping ) + val rowtimeExpression = TableSourceUtil.getRowtimeAttributeDescriptor( + tableSource, + selectedFields) + .map(desc => TableSourceUtil.getRowtimeExtractionExpression( + desc.getTimestampExtractor, + producedDataType, + TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP()), + tableEnv.getRelBuilder, + nameMapping + )) + // ingest table and convert and extract time attributes if necessary convertToInternalRow( - outputSchema, + new RowSchema(getRowType), inputDataSet, fieldIndexes, config, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index ab2ef0f..573bfff 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -18,42 +18,55 @@ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rex.RexNode import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.{StreamQueryConfig, TableException} -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.api.{StreamQueryConfig, TableException, TableSchema} import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.planner.StreamPlanner import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources._ import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PreserveWatermarks, PunctuatedWatermarkAssigner} +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import org.apache.flink.table.utils.TypeMappingUtils + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util.function.{Function => JFunction} + +import scala.collection.JavaConverters._ /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, + tableSchema: TableSchema, tableSource: StreamTableSource[_], selectedFields: Option[Array[Int]]) - extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, selectedFields) + extends PhysicalTableSourceScan( + cluster, + traitSet, + table, + tableSchema, + tableSource, + selectedFields) with StreamScan { override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - TableSourceUtil.getRelDataType( - tableSource, - selectedFields, - streaming = true, - flinkTypeFactory) + val baseRowType = table.getRowType + selectedFields.map(idxs => { + val fields = baseRowType.getFieldList + val builder = cluster.getTypeFactory.builder() + idxs.map(fields.get).foreach(builder.add) + builder.build() + }).getOrElse(baseRowType) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -66,6 +79,7 @@ class StreamTableSourceScan( cluster, traitSet, getTable, + tableSchema, tableSource, selectedFields ) @@ -79,6 +93,7 @@ class StreamTableSourceScan( cluster, traitSet, getTable, + tableSchema, newTableSource.asInstanceOf[StreamTableSource[_]], selectedFields ) @@ -88,11 +103,6 @@ class StreamTableSourceScan( planner: StreamPlanner, queryConfig: StreamQueryConfig): DataStream[CRow] = { - val fieldIndexes = TableSourceUtil.computeIndexMapping( - tableSource, - isStreamTable = true, - selectedFields) - val config = planner.getConfig val inputDataStream = tableSource.getDataStream(planner.getExecutionEnvironment) .asInstanceOf[DataStream[Any]] @@ -109,13 +119,32 @@ class StreamTableSourceScan( s"Please validate the implementation of the TableSource.") } + val nameMapping: JFunction[String, String] = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } + // get expression to extract rowtime attribute - val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + val rowtimeExpression = TableSourceUtil.getRowtimeAttributeDescriptor( tableSource, - selectedFields, - cluster, - planner.getRelBuilder, - TimeIndicatorTypeInfo.ROWTIME_INDICATOR + selectedFields) + .map(desc => TableSourceUtil.getRowtimeExtractionExpression( + desc.getTimestampExtractor, + producedDataType, + TypeConversions.fromLegacyInfoToDataType(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), + planner.getRelBuilder, + nameMapping + )) + + val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( + tableSource, + selectedFields.map(_.map(tableSchema.getTableColumn(_).get()).toList.asJava) + .getOrElse(tableSchema.getTableColumns), + true, + nameMapping ) // ingest table and convert and extract time attributes if necessary diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index fb0a09c..d444816 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -18,6 +18,13 @@ package org.apache.flink.table.plan.nodes.logical +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.{FilterableTableSource, TableSource} +import org.apache.flink.table.types.utils.{DataTypeUtils, TypeConversions} + import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.ConverterRule @@ -25,10 +32,6 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.nodes.FlinkConventions -import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.{FilterableTableSource, TableSource, TableSourceUtil} import scala.collection.JavaConverters._ @@ -36,6 +39,7 @@ class FlinkLogicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, + val tableSchema: TableSchema, val tableSource: TableSource[_], val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) @@ -43,16 +47,26 @@ class FlinkLogicalTableSourceScan( def copy( traitSet: RelTraitSet, + tableSchema: TableSchema, tableSource: TableSource[_], selectedFields: Option[Array[Int]]): FlinkLogicalTableSourceScan = { - new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource, selectedFields) + new FlinkLogicalTableSourceScan( + cluster, + traitSet, + getTable, + tableSchema, + tableSource, + selectedFields) } override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - val streamingTable = table.unwrap(classOf[TableSourceTable[_]]).isStreamingMode - - TableSourceUtil.getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory) + val baseRowType = table.getRowType + selectedFields.map(idxs => { + val fields = baseRowType.getFieldList + val builder = cluster.getTypeFactory.builder() + idxs.map(fields.get).foreach(builder.add) + builder.build() + }).getOrElse(baseRowType) } override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -74,7 +88,7 @@ class FlinkLogicalTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { val terms = super.explainTerms(pw) - .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", ")) + .item("fields", tableSchema.getFieldNames.mkString(", ")) val sourceDesc = tableSource.explainSource() if (sourceDesc.nonEmpty) { @@ -113,11 +127,13 @@ class FlinkLogicalTableSourceScanConverter val scan = rel.asInstanceOf[TableScan] val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL) + val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) new FlinkLogicalTableSourceScan( rel.getCluster, traitSet, scan.getTable, - scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource, + tableSourceTable.tableSchema, + tableSourceTable.tableSource, None ) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index e45ad85..8942835 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -50,6 +50,7 @@ class BatchTableSourceScanRule rel.getCluster, traitSet, scan.getTable, + scan.tableSchema, scan.tableSource, scan.selectedFields ) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 1ddc1c3..959c893 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -52,6 +52,7 @@ class StreamTableSourceScanRule rel.getCluster, traitSet, scan.getTable, + scan.tableSchema, scan.tableSource.asInstanceOf[StreamTableSource[_]], scan.selectedFields ) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala index 44b100f..a8fbd78 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -113,7 +113,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( // check whether we still need a RexProgram. An RexProgram is needed when either // projection or filter exists. - val newScan = scan.copy(scan.getTraitSet, newTableSource, scan.selectedFields) + val newScan = scan.copy(scan.getTraitSet, scan.tableSchema, newTableSource, scan.selectedFields) val newRexProgram = { if (remainingCondition != null || !program.projectsOnlyIdentity) { val expandedProjectList = program.getProjectList.asScala diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala index 9864cd0..932b745 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala @@ -18,12 +18,22 @@ package org.apache.flink.table.plan.rules.logical -import org.apache.calcite.plan.RelOptRule.{none, operand} -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} +import org.apache.flink.table.api.{TableException, TableSchema} import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan} +import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} +import org.apache.flink.table.sources.TableSourceUtil.getRowtimeAttributeDescriptor import org.apache.flink.table.sources._ +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks +import org.apache.flink.table.types.utils.DataTypeUtils +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import org.apache.flink.table.utils.TypeMappingUtils + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} + +import java.util.function.{Function => JFunction} + +import scala.collection.JavaConverters._ class PushProjectIntoTableSourceScanRule extends RelOptRule( operand(classOf[FlinkLogicalCalc], @@ -43,7 +53,29 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( val source = scan.tableSource val accessedLogicalFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) - val accessedPhysicalFields = TableSourceUtil.getPhysicalIndexes(source, accessedLogicalFields) + + val nameMapping: JFunction[String, String] = source match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } + + val accessedIndices = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( + source, + accessedLogicalFields.map(scan.tableSchema.getTableColumn(_).get()).toList.asJava, + source.isInstanceOf[StreamTableSource[_]], + nameMapping + ) + val physicalFields = expandTimeAttributes( + source, + if (LogicalTypeChecks.isCompositeType(source.getProducedDataType.getLogicalType)) { + DataTypeUtils.expandCompositeTypeToSchema(source.getProducedDataType) + } else { + null + }, + accessedIndices) // only continue if fields are projected or reordered. // eager reordering can remove a calc operator. @@ -53,10 +85,10 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( val (newTableSource, isProjectSuccess) = source match { case nested: NestedFieldsProjectableTableSource[_] => val nestedFields = RexProgramExtractor - .extractRefNestedInputFields(calc.getProgram, accessedPhysicalFields) - (nested.projectNestedFields(accessedPhysicalFields, nestedFields), true) + .extractRefNestedInputFields(calc.getProgram, physicalFields) + (nested.projectNestedFields(physicalFields, nestedFields), true) case projecting: ProjectableTableSource[_] => - (projecting.projectFields(accessedPhysicalFields), true) + (projecting.projectFields(physicalFields), true) case nonProjecting: TableSource[_] => // projection cannot be pushed to TableSource (nonProjecting, false) @@ -77,7 +109,11 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( } // Apply the projection during the input conversion of the scan. - val newScan = scan.copy(scan.getTraitSet, newTableSource, Some(accessedLogicalFields)) + val newScan = scan.copy( + scan.getTraitSet, + scan.tableSchema, + newTableSource, + Some(accessedLogicalFields)) val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection( calc.getProgram, newScan.getRowType, @@ -93,6 +129,55 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( } } } + + private def expandTimeAttributes( + tableSource: TableSource[_], + physicalSchema: TableSchema, + physicalIndices: Array[Int]): Array[Int] = { + + physicalIndices + // resolve time indicator markers to physical indexes + .flatMap { + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => + // proctime field do not access a physical field + Seq() + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => + // rowtime field is computed. + // get names of fields which are accessed by the expression to compute the rowtime field. + val rowtimeAttributeDescriptor = getRowtimeAttributeDescriptor(tableSource, None) + val accessedFields = if (rowtimeAttributeDescriptor.isDefined) { + rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields + } else { + throw new TableException("Computed field mapping includes a rowtime marker but the " + + "TableSource does not provide a RowtimeAttributeDescriptor. " + + "This is a bug and should be reported.") + } + // resolve field names to physical fields + accessedFields.map(name => { + if (physicalSchema != null) { + mapToCompositeType(tableSource, physicalSchema, name) + } else { + 0 + } + }) + case idx => + Seq(idx) + } + } + + private def mapToCompositeType( + tableSource: TableSource[_], + physicalSchema: TableSchema, + name: String) + : Int = { + val fieldName = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + mapping.getFieldMapping.get(name) + case _ => + name + } + physicalSchema.getFieldNames.indexWhere(_.equals(fieldName)) // logical accessed field + } } object PushProjectIntoTableSourceScanRule { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index a134968..eedc105 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,12 +18,21 @@ package org.apache.flink.table.plan.schema +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.{DefinedFieldMapping, TableSource, TableSourceValidation} +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks +import org.apache.flink.table.types.utils.{DataTypeUtils, TypeConversions} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import org.apache.flink.table.utils.TypeMappingUtils + import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.Statistic import org.apache.calcite.schema.impl.AbstractTable -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{TableSource, TableSourceUtil, TableSourceValidation} + +import java.util.function.{Function => JFunction} /** * Abstract class which define the interfaces required to convert a [[TableSource]] to @@ -34,12 +43,13 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil, TableSource * @param statistic The table statistics. */ class TableSourceTable[T]( + val tableSchema: TableSchema, val tableSource: TableSource[T], val isStreamingMode: Boolean, val statistic: FlinkStatistic) extends AbstractTable { - TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema) + TableSourceValidation.validateTableSource(tableSource, tableSchema) /** * Returns statistics of current table @@ -48,11 +58,53 @@ class TableSourceTable[T]( */ override def getStatistic: Statistic = statistic + // We must enrich logical schema from catalog table with physical type coming from table source. + // Schema coming from catalog table might not have proper conversion classes. Those must be + // extracted from produced type, before converting to RelDataType def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - TableSourceUtil.getRelDataType( + + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + + val fieldNames = tableSchema.getFieldNames + + val nameMapping: JFunction[String, String] = tableSource match { + case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => + new JFunction[String, String] { + override def apply(t: String): String = mapping.getFieldMapping.get(t) + } + case _ => JFunction.identity() + } + + val producedDataType = tableSource.getProducedDataType + val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, - None, + tableSchema.getTableColumns, isStreamingMode, - typeFactory.asInstanceOf[FlinkTypeFactory]) + nameMapping + ) + + val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { + val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType) + fieldIndexes.map(mapIndex(_, + idx => + TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) + ) + } else { + fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) + } + + flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) + } + + def mapIndex(idx: Int, mapNonMarker: Int => TypeInformation[_]): TypeInformation[_] = { + idx match { + case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => Types.SQL_TIMESTAMP() + case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER => Types.SQL_TIMESTAMP() + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => + TimeIndicatorTypeInfo.PROCTIME_INDICATOR + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case _ => + mapNonMarker(idx) + } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala index f91f2fa..61e5957 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -18,165 +18,26 @@ package org.apache.flink.table.sources -import java.sql.Timestamp -import com.google.common.collect.ImmutableList +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{DataTypes, Types, ValidationException} +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall} +import org.apache.flink.table.expressions.{Expression, PlannerExpressionConverter, ResolvedFieldReference} +import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST +import org.apache.flink.table.sources.tsextractors.{TimestampExtractor, TimestampExtractorUtils} +import org.apache.flink.table.types.DataType + import org.apache.calcite.plan.RelOptCluster import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.logical.LogicalValues -import org.apache.calcite.rex.{RexLiteral, RexNode} -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.{DataTypes, TableException, Types, ValidationException} -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall} -import org.apache.flink.table.expressions.{PlannerExpressionConverter, ResolvedFieldReference} -import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST -import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType} -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import org.apache.calcite.rex.RexNode -import scala.collection.JavaConverters._ +import java.util.function.{Function => JFunction} /** Util class for [[TableSource]]. */ object TableSourceUtil { - /** Returns true if the [[TableSource]] has a rowtime attribute. */ - def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = - getRowtimeAttributes(tableSource).nonEmpty - - /** Returns true if the [[TableSource]] has a proctime attribute. */ - def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = - getProctimeAttribute(tableSource).nonEmpty - - /** - * Computes the indices that map the input type of the DataStream to the schema of the table. - * - * The mapping is based on the field names and fails if a table field cannot be - * mapped to a field of the input type. - * - * @param tableSource The table source for which the table schema is mapped to the input type. - * @param isStreamTable True if the mapping is computed for a streaming table, false otherwise. - * @param selectedFields The indexes of the table schema fields for which a mapping is - * computed. If None, a mapping for all fields is computed. - * @return An index mapping from input type to table schema. - */ - def computeIndexMapping( - tableSource: TableSource[_], - isStreamTable: Boolean, - selectedFields: Option[Array[Int]]): Array[Int] = { - val inputType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType) - val tableSchema = tableSource.getTableSchema - - // get names of selected fields - val tableFieldNames = if (selectedFields.isDefined) { - val names = tableSchema.getFieldNames - selectedFields.get.map(names(_)) - } else { - tableSchema.getFieldNames - } - - // get types of selected fields - val tableFieldTypes = if (selectedFields.isDefined) { - val types = tableSchema.getFieldTypes - selectedFields.get.map(types(_)) - } else { - tableSchema.getFieldTypes - } - - // get rowtime and proctime attributes - val rowtimeAttributes = getRowtimeAttributes(tableSource) - val proctimeAttributes = getProctimeAttribute(tableSource) - - // compute mapping of selected fields and time attributes - val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map { - case (t: SqlTimeTypeInfo[_], name: String) - if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => - if (isStreamTable) { - TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER - } else { - TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER - } - case (t: SqlTimeTypeInfo[_], name: String) - if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => - if (isStreamTable) { - TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER - } else { - TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER - } - case (t: TypeInformation[_], name) => - // check if field is registered as time indicator - if (getProctimeAttribute(tableSource).contains(name)) { - throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + - s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") - } - if (getRowtimeAttributes(tableSource).contains(name)) { - throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + - s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") - } - - val (physicalName, idx, tpe) = resolveInputField(name, tableSource) - // validate that mapped fields are are same type - if (tpe != t) { - throw new ValidationException(s"Type $t of table field '$name' does not " + - s"match with type $tpe of the field '$physicalName' of the TableSource return type.") - } - idx - } - - // ensure that only one field is mapped to an atomic type - if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) { - throw new ValidationException( - s"More than one table field matched to atomic input type $inputType.") - } - - mapping - } - - /** - * Returns the Calcite schema of a [[TableSource]]. - * - * @param tableSource The [[TableSource]] for which the Calcite schema is generated. - * @param selectedFields The indices of all selected fields. None, if all fields are selected. - * @param streaming Flag to determine whether the schema of a stream or batch table is created. - * @param typeFactory The type factory to create the schema. - * @return The Calcite schema for the selected fields of the given [[TableSource]]. - */ - def getRelDataType( - tableSource: TableSource[_], - selectedFields: Option[Array[Int]], - streaming: Boolean, - typeFactory: FlinkTypeFactory): RelDataType = { - - val fieldNames = tableSource.getTableSchema.getFieldNames - var fieldTypes = tableSource.getTableSchema.getFieldTypes - - if (streaming) { - // adjust the type of time attributes for streaming tables - val rowtimeAttributes = getRowtimeAttributes(tableSource) - val proctimeAttributes = getProctimeAttribute(tableSource) - - // patch rowtime fields with time indicator type - rowtimeAttributes.foreach { rowtimeField => - val idx = fieldNames.indexOf(rowtimeField) - fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1) - } - // patch proctime field with time indicator type - proctimeAttributes.foreach { proctimeField => - val idx = fieldNames.indexOf(proctimeField) - fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.PROCTIME_INDICATOR), 1) - } - } - - val (selectedFieldNames, selectedFieldTypes) = if (selectedFields.isDefined) { - // filter field names and types by selected fields - (selectedFields.get.map(fieldNames(_)), selectedFields.get.map(fieldTypes(_))) - } else { - (fieldNames, fieldTypes) - } - typeFactory.buildLogicalRowType(selectedFieldNames, selectedFieldTypes) - } - /** * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]]. * @@ -220,207 +81,71 @@ object TableSourceUtil { } /** - * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]]. - * - * @param tableSource The [[TableSource]] for which the expression is extracted. - * @param selectedFields The selected fields of the [[TableSource]]. - * If None, all fields are selected. - * @param cluster The [[RelOptCluster]] of the current optimization process. - * @param relBuilder The [[RelBuilder]] to build the [[RexNode]]. - * @param resultType The result type of the timestamp expression. - * @return The [[RexNode]] expression to extract the timestamp of the table source. - */ + * Retrieves an expression to compute a rowtime attribute. + * + * @param extractor Timestamp extractor to construct an expression for. + * @param physicalInputType Physical input type that the timestamp extractor accesses. + * @param expectedDataType Expected type of the expression. Different in case of batch and + * streaming. + * @param relBuilder Builder needed to construct the resulting RexNode. + * @param nameMapping Additional remapping of a logical to a physical field name. + * TimestampExtractor works with logical names, but accesses physical + * fields + * @return The [[RexNode]] expression to extract the timestamp. + */ def getRowtimeExtractionExpression( - tableSource: TableSource[_], - selectedFields: Option[Array[Int]], - cluster: RelOptCluster, - relBuilder: RelBuilder, - resultType: TypeInformation[_]): Option[RexNode] = { - - val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + extractor: TimestampExtractor, + physicalInputType: DataType, + expectedDataType: DataType, + relBuilder: FlinkRelBuilder, + nameMapping: JFunction[String, String]) + : RexNode = { + val accessedFields = TimestampExtractorUtils.getAccessedFields( + extractor, + physicalInputType, + nameMapping) + + relBuilder.push(createSchemaRelNode(accessedFields, relBuilder.getCluster)) + val expr = constructExpression( + expectedDataType, + extractor, + accessedFields + ).accept(PlannerExpressionConverter.INSTANCE) + .toRexNode(relBuilder) + relBuilder.clear() + expr + } - /** - * Creates a RelNode with a schema that corresponds on the given fields - * Fields for which no information is available, will have default values. - */ - def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = { - val maxIdx = fields.map(_._2).max - val idxMap: Map[Int, (String, TypeInformation[_])] = Map( - fields.map(f => f._2 -> (f._1, f._3)): _*) - val (physicalFields, physicalTypes) = (0 to maxIdx) - .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip - val physicalSchema: RelDataType = typeFactory.buildLogicalRowType( + private def createSchemaRelNode( + fields: Array[ResolvedFieldReference], + cluster: RelOptCluster) + : RelNode = { + val maxIdx = fields.map(_.fieldIndex()).max + val idxMap: Map[Int, (String, TypeInformation[_])] = Map( + fields.map(f => f.fieldIndex() -> (f.name(), f.resultType())): _*) + val (physicalFields, physicalTypes) = (0 to maxIdx) + .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip + val physicalSchema: RelDataType = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + .buildLogicalRowType( physicalFields, physicalTypes) - LogicalValues.create( - cluster, - physicalSchema, - ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]]) - } - - val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, selectedFields) - rowtimeDesc.map { r => - val tsExtractor = r.getTimestampExtractor - - val fieldAccesses = if (tsExtractor.getArgumentFields.nonEmpty) { - val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource) - // push an empty values node with the physical schema on the relbuilder - relBuilder.push(createSchemaRelNode(resolvedFields)) - // get extraction expression - resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2)) - } else { - new Array[ResolvedFieldReference](0) - } - - val expression = tsExtractor.getExpression(fieldAccesses) - // add cast to requested type and convert expression to RexNode - // If resultType is TimeIndicatorTypeInfo, its internal format is long, but cast - // from Timestamp is java.sql.Timestamp. So we need cast to long first. - val castExpression = unresolvedCall(CAST, - unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT())), - typeLiteral(fromLegacyInfoToDataType(resultType))) - - // TODO we convert to planner expressions as a temporary solution - val rexExpression = castExpression - .accept(PlannerExpressionConverter.INSTANCE) - .toRexNode(relBuilder) - relBuilder.clear() - rexExpression - } - } - - /** - * Returns the indexes of the physical fields that required to compute the given logical fields. - * - * @param tableSource The [[TableSource]] for which the physical indexes are computed. - * @param logicalFieldIndexes The indexes of the accessed logical fields for which the physical - * indexes are computed. - * @return The indexes of the physical fields are accessed to forward and compute the logical - * fields. - */ - def getPhysicalIndexes( - tableSource: TableSource[_], - logicalFieldIndexes: Array[Int]): Array[Int] = { - - // get the mapping from logical to physical positions. - // stream / batch distinction not important here - val fieldMapping = computeIndexMapping(tableSource, isStreamTable = true, None) - - logicalFieldIndexes - // resolve logical indexes to physical indexes - .map(fieldMapping(_)) - // resolve time indicator markers to physical indexes - .flatMap { - case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => - // proctime field do not access a physical field - Seq() - case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => - // rowtime field is computed. - // get names of fields which are accessed by the expression to compute the rowtime field. - val rowtimeAttributeDescriptor = getRowtimeAttributeDescriptor(tableSource, None) - val accessedFields = if (rowtimeAttributeDescriptor.isDefined) { - rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields - } else { - throw new TableException("Computed field mapping includes a rowtime marker but the " + - "TableSource does not provide a RowtimeAttributeDescriptor. " + - "This is a bug and should be reported.") - } - // resolve field names to physical fields - resolveInputFields(accessedFields, tableSource).map(_._2) - case idx => - Seq(idx) - } + LogicalValues.createEmpty( + cluster, + physicalSchema) } - /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ - private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = { - tableSource match { - case r: DefinedRowtimeAttributes => - r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray - case _ => - Array() - } + private def constructExpression( + expectedType: DataType, + timestampExtractor: TimestampExtractor, + fieldAccesses: Array[ResolvedFieldReference]) + : Expression = { + val expression = timestampExtractor.getExpression(fieldAccesses) + // add cast to requested type and convert expression to RexNode + // If resultType is TimeIndicatorTypeInfo, its internal format is long, but cast + // from Timestamp is java.sql.Timestamp. So we need cast to long first. + unresolvedCall( + CAST, + unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT)), + typeLiteral(expectedType)) } - - /** Returns the proctime attribute of the [[TableSource]] if it is defined. */ - private def getProctimeAttribute(tableSource: TableSource[_]): Option[String] = { - tableSource match { - case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => - Some(p.getProctimeAttribute) - case _ => - None - } - } - - /** - * Identifies for a field name of the logical schema, the corresponding physical field in the - * return type of a [[TableSource]]. - * - * @param fieldName The logical field to look up. - * @param tableSource The table source in which to look for the field. - * @return The name, index, and type information of the physical field. - */ - private def resolveInputField( - fieldName: String, - tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = { - - val returnType = fromDataTypeToLegacyInfo(tableSource.getProducedDataType) - - /** Look up a field by name in a type information */ - def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = { - returnType match { - - case c: CompositeType[_] => - // get and check field index - val idx = c.getFieldIndex(fieldName) - if (idx < 0) { - throw new ValidationException(failMsg) - } - // return field name, index, and field type - (fieldName, idx, c.getTypeAt(idx)) - - case t: TypeInformation[_] => - // no composite type, we return the full atomic type as field - (fieldName, 0, t) - } - } - - tableSource match { - case d: DefinedFieldMapping if d.getFieldMapping != null => - // resolve field name in field mapping - val resolvedFieldName = d.getFieldMapping.get(fieldName) - if (resolvedFieldName == null) { - throw new ValidationException( - s"Field '$fieldName' could not be resolved by the field mapping.") - } - // look up resolved field in return type - lookupField( - resolvedFieldName, - s"Table field '$fieldName' was resolved to TableSource return type field " + - s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " + - s"type $returnType of the TableSource. " + - s"Please verify the field mapping of the TableSource.") - case _ => - // look up field in return type - lookupField( - fieldName, - s"Table field '$fieldName' was not found in the return type $returnType of the " + - s"TableSource.") - } - } - - /** - * Identifies the physical fields in the return type [[TypeInformation]] of a [[TableSource]] - * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]]. - * - * @param fieldNames The field names to look up. - * @param tableSource The table source in which to look for the field. - * @return The name, index, and type information of the physical field. - */ - private def resolveInputFields( - fieldNames: Array[String], - tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = { - fieldNames.map(resolveInputField(_, tableSource)) - } - } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java index 94dcac5..cbdfcc0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogStructureBuilder.java @@ -18,11 +18,11 @@ package org.apache.flink.table.catalog; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import java.util.Collections; @@ -270,20 +270,20 @@ public class CatalogStructureBuilder { } @Override - public TypeInformation<Row> getReturnType() { - return tableSchema.toRowType(); + public DataType getProducedDataType() { + return tableSchema.toRowDataType(); } @Override public TableSchema getTableSchema() { - return tableSchema; + throw new UnsupportedOperationException("Should not be called"); } @Override public String explainSource() { return String.format("isTemporary=[%s]", isTemporary); } - }, null, TableSchema.builder().build(), false); + }, null, tableSchema, false); this.fullyQualifiedPath = fullyQualifiedPath; this.isTemporary = isTemporary;