[ https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-17600. --------------------------- Resolution: Duplicate close since it is fixed by FLINK-16160 > Blink Planner fails to generate RowtimeAttribute based on TableSource's > DefinedRowtimeAttributes implementation > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-17600 > URL: https://issues.apache.org/jira/browse/FLINK-17600 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: Yuval Itzchakov > Priority: Major > > Given the following SQL statement: > {code:java} > tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code} > Where FOO is a table originating from a custom StreamTableSource[Row] which > implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink > Planner fails to mark the selected field with a `RowtimeAttribute`. > This happens because `TableSourceUtil.getSourceRowType`s implementation > receives a `None` TableSource from `CatalogSchemaTable.getRowType`, > presumably because the Catalog has yet to create the underlying TableSource > which is deferred to implementing TableFactory (in this case my own custom > one). > *This* *does not reproduce in the old Flink planner*, because the old planner > uses `TableSourceTable` which explicitly holds a reference to the underlying > `TableSource` and extracts it's row time attributes. > Relevant code: > *CatalogSchemaTable*: > > {code:java} > private static RelDataType getRowType(RelDataTypeFactory typeFactory, > CatalogBaseTable catalogBaseTable, > boolean isStreamingMode) { > final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; > TableSchema tableSchema = catalogBaseTable.getSchema(); > final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); > if (!isStreamingMode > && catalogBaseTable instanceof ConnectorCatalogTable > && ((ConnectorCatalogTable) > catalogBaseTable).getTableSource().isPresent()) { > // If the table source is bounded, materialize the time attributes to > normal TIMESTAMP type. > // Now for ConnectorCatalogTable, there is no way to > // deduce if it is bounded in the table environment, so the data types > in TableSchema > // always patched with TimeAttribute. > // See ConnectorCatalogTable#calculateSourceSchema > // for details. > // Remove the patched time attributes type to let the TableSourceTable > handle it. > // We should remove this logic if the isBatch flag in > ConnectorCatalogTable is fixed. > // TODO: Fix FLINK-14844. > for (int i = 0; i < fieldDataTypes.length; i++) { > LogicalType lt = fieldDataTypes[i].getLogicalType(); > if (lt instanceof TimestampType > && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME > || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { > int precision = ((TimestampType) lt).getPrecision(); > fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); > } > } > } > return TableSourceUtil.getSourceRowType(flinkTypeFactory, > tableSchema, > scala.Option.empty(), > isStreamingMode); > } > {code} > *TableSourceUtil:* > > > {code:java} > def getSourceRowType( > typeFactory: FlinkTypeFactory, > tableSchema: TableSchema, > tableSource: Option[TableSource[_]], > streaming: Boolean): RelDataType = { > val fieldNames = tableSchema.getFieldNames > val fieldDataTypes = tableSchema.getFieldDataTypes > if (tableSchema.getWatermarkSpecs.nonEmpty) { > getSourceRowType(typeFactory, fieldNames, fieldDataTypes, > tableSchema.getWatermarkSpecs.head, > streaming) > } else if (tableSource.isDefined) { > getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, > streaming) > } else { > val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) > typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) > } > }{code} > *TableSourceTable:* > {code:java} > // We must enrich logical schema from catalog table with physical type > coming from table source. > // Schema coming from catalog table might not have proper conversion > classes. Those must be > // extracted from produced type, before converting to RelDataType > def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val > flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val > fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, > String] = tableSource match { > case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => > new JFunction[String, String] { > override def apply(t: String): String = > mapping.getFieldMapping.get(t) > } > case _ => JFunction.identity() > } val producedDataType = tableSource.getProducedDataType > val fieldIndexes = > TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( > tableSource, > tableSchema.getTableColumns, > isStreamingMode, > nameMapping > ) val typeInfos = if > (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { > val physicalSchema = > DataTypeUtils.expandCompositeTypeToSchema(producedDataType) > fieldIndexes.map(mapIndex(_, > idx => > > TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) > ) > } else { > fieldIndexes.map(mapIndex(_, _ => > TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) > } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)