[ https://issues.apache.org/jira/browse/FLINK-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088275#comment-17088275 ]
Zhenghua Gao edited comment on FLINK-16160 at 4/21/20, 6:04 AM: ---------------------------------------------------------------- The root causes are: * proctime()/rowtime() are used along with DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable. The original code path stores the ConnectorCatalogTable object in Catalog and in validate phase, the RowType is derived from ConnectorCatalogTable.getSchema which contains time indicator. After FLINK-14490, we store CatalogTableImpl object in Catalog and in validate phrase, the RowType is derived from CatalogTableImpl.getSchema which doesn't contain time indicator. * In SqlToRel phase, FlinkCalciteCatalogReader converts ConnectorCatalogTable to TableSourceTable and converts CatalogTable to CatalogSourceTable. The TableSourceTable would be converted to LogicalTableScan directly and contains time indicator. Otherwise the CatalogSourceTable would be converted to a LogicalTableScan whose time indicator is erased(by FLINK-16345). The solution is straightforward: * We should instantiate the TableSource in CatalogSchemaTable and check if it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, rewrite the TableSchema to patch the time indicator(as it is in ConnectorCatalogTable#calculateSourceSchema). This will pass the validation. * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a DefinedRowtimeAttributes/DefinedProctimeAttribute instance was (Author: docete): The root causes are: * proctime()/rowtime() are used along with DefinedRowtimeAttributes/DefinedProctimeAttribute and ConnectorCatalogTable. The original code path stores the ConnectorCatalogTable object in Catalog and in validate phrase, the RowType is derived from ConnectorCatalogTable.getSchema which contains time indicator. After FLINK-14490, we store CatalogTableImpl object in Catalog and in validate phrase, the RowType is derived from CatalogTableImpl.getSchema which doesn't contain time indicator. * In SqlToRel phrase, FlinkCalciteCatalogReader converts ConnectorCatalogTable to TableSourceTable and converts CatalogTable to CatalogSourceTable. The TableSourceTable would be converted to LogicalTableScan directly and contains time indicator. Otherwise the CatalogSourceTable would be converted to a LogicalTableScan whose time indicator is erased(by FLINK-16345). The solution is straightforward: * We should instantiate the TableSource in CatalogSchemaTable and check if it's a DefinedRowtimeAttributes/DefinedProctimeAttribute instance. If so, rewrite the TableSchema to patch the time indicator(as it is in ConnectorCatalogTable#calculateSourceSchema). This will pass the validation. * Avoid erasing time indicator in CatalogSourceTable if the TableSource is a DefinedRowtimeAttributes/DefinedProctimeAttribute instance > Schema#proctime and Schema#rowtime don't work in TableEnvironment#connect > code path > ----------------------------------------------------------------------------------- > > Key: FLINK-16160 > URL: https://issues.apache.org/jira/browse/FLINK-16160 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API > Reporter: Zhenghua Gao > Assignee: Zhenghua Gao > Priority: Critical > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In ConnectTableDescriptor#createTemporaryTable, the proctime/rowtime > properties are ignored so the generated catalog table is not correct. We > should fix this to let TableEnvironment#connect() support watermark. -- This message was sent by Atlassian Jira (v8.3.4#803005)