[ 
https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-17600.
---------------------------
    Resolution: Duplicate

close since it is fixed by FLINK-16160

> Blink Planner fails to generate RowtimeAttribute based on TableSource's 
> DefinedRowtimeAttributes implementation
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17600
>                 URL: https://issues.apache.org/jira/browse/FLINK-17600
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Yuval Itzchakov
>            Priority: Major
>
> Given the following SQL statement: 
> {code:java}
> tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code}
> Where FOO is a table originating from a custom StreamTableSource[Row] which 
> implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink 
> Planner fails to mark the selected field with a `RowtimeAttribute`.
> This happens because `TableSourceUtil.getSourceRowType`s implementation 
> receives a `None` TableSource from `CatalogSchemaTable.getRowType`, 
> presumably because the Catalog has yet to create the underlying TableSource 
> which is deferred to implementing TableFactory (in this case my own custom 
> one).
> *This* *does not reproduce in the old Flink planner*, because the old planner 
> uses `TableSourceTable` which explicitly holds a reference to the underlying 
> `TableSource` and extracts it's row time attributes.
> Relevant code:
> *CatalogSchemaTable*:
>  
> {code:java}
> private static RelDataType getRowType(RelDataTypeFactory typeFactory,
>       CatalogBaseTable catalogBaseTable,
>       boolean isStreamingMode) {
>    final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
>    TableSchema tableSchema = catalogBaseTable.getSchema();
>    final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
>    if (!isStreamingMode
>       && catalogBaseTable instanceof ConnectorCatalogTable
>       && ((ConnectorCatalogTable) 
> catalogBaseTable).getTableSource().isPresent()) {
>       // If the table source is bounded, materialize the time attributes to 
> normal TIMESTAMP type.
>       // Now for ConnectorCatalogTable, there is no way to
>       // deduce if it is bounded in the table environment, so the data types 
> in TableSchema
>       // always patched with TimeAttribute.
>       // See ConnectorCatalogTable#calculateSourceSchema
>       // for details.
>       // Remove the patched time attributes type to let the TableSourceTable 
> handle it.
>       // We should remove this logic if the isBatch flag in 
> ConnectorCatalogTable is fixed.
>       // TODO: Fix FLINK-14844.
>       for (int i = 0; i < fieldDataTypes.length; i++) {
>          LogicalType lt = fieldDataTypes[i].getLogicalType();
>          if (lt instanceof TimestampType
>             && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
>             || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
>             int precision = ((TimestampType) lt).getPrecision();
>             fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
>          }
>       }
>    }
>    return TableSourceUtil.getSourceRowType(flinkTypeFactory,
>       tableSchema,
>       scala.Option.empty(),
>       isStreamingMode);
> }
> {code}
> *TableSourceUtil:*
>  
>  
> {code:java}
> def getSourceRowType(
>     typeFactory: FlinkTypeFactory,
>     tableSchema: TableSchema,
>     tableSource: Option[TableSource[_]],
>     streaming: Boolean): RelDataType = {
>   val fieldNames = tableSchema.getFieldNames
>   val fieldDataTypes = tableSchema.getFieldDataTypes
>   if (tableSchema.getWatermarkSpecs.nonEmpty) {
>     getSourceRowType(typeFactory, fieldNames, fieldDataTypes, 
> tableSchema.getWatermarkSpecs.head,
>       streaming)
>   } else if (tableSource.isDefined) {
>     getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get,
>       streaming)
>   } else {
>     val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
>     typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
>   }
> }{code}
> *TableSourceTable:*
> {code:java}
>  // We must enrich logical schema from catalog table with physical type 
> coming from table source.
>   // Schema coming from catalog table might not have proper conversion 
> classes. Those must be
>   // extracted from produced type, before converting to RelDataType
>   def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {    val 
> flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]    val 
> fieldNames = tableSchema.getFieldNames    val nameMapping: JFunction[String, 
> String] = tableSource match {
>       case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
>         new JFunction[String, String] {
>           override def apply(t: String): String = 
> mapping.getFieldMapping.get(t)
>         }
>       case _ => JFunction.identity()
>     }    val producedDataType = tableSource.getProducedDataType
>     val fieldIndexes = 
> TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
>       tableSource,
>       tableSchema.getTableColumns,
>       isStreamingMode,
>       nameMapping
>     )    val typeInfos = if 
> (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) {
>       val physicalSchema = 
> DataTypeUtils.expandCompositeTypeToSchema(producedDataType)
>       fieldIndexes.map(mapIndex(_,
>         idx =>
>           
> TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get()))
>       )
>     } else {
>       fieldIndexes.map(mapIndex(_, _ => 
> TypeConversions.fromDataTypeToLegacyInfo(producedDataType)))
>     }    flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos)
>   }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to