[ https://issues.apache.org/jira/browse/FLINK-34935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Du Yuzhou updated FLINK-34935: ------------------------------ Description: When I use JdbcCatalog to select from postgres table, it throw Exception: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481) at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738) at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76) at com.demo.DataflowJob.main(DataflowJob.java:110) {code} The source table is a postgres table, which schema is: {code:java} create table t_test ( id integer primary key, name timestamptz );{code} I've read the relevant code and roughly located where the problem is. Can I make some code contribution to fix this problem? was: When I use JdbcCatalog to select from postgres table, it throw Exception: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481) at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261) at org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70) at org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738) at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76) at com.demo.DataflowJob.main(DataflowJob.java:110) {code} The source table is a postgres table, which schema is: {code:java} create table t_test ( id integer primary key, name timestamptz );{code} I've read the relevant code and roughly located where the problem is. Can I create some corresponding pr to fix this problem? > TIMESTAMP_LTZ type Unsupported when using JdbcCatalog to read from Postgres > --------------------------------------------------------------------------- > > Key: FLINK-34935 > URL: https://issues.apache.org/jira/browse/FLINK-34935 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.1 > Environment: flink: 1.17.0 > flink-connector-jdbc: 3.1.0-1.17 > postgres: 14.5 > java: 11 > Reporter: Du Yuzhou > Priority: Major > Labels: flink, jdbc_connector, postgres > Original Estimate: 72h > Remaining Estimate: 72h > > When I use JdbcCatalog to select from postgres table, it throw Exception: > {code:java} > Exception in thread "main" java.lang.UnsupportedOperationException: > Unsupported type:TIMESTAMP_LTZ(6) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) > at > org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) > at > org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68) > at > org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47) > at > org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50) > at > org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) > at > org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366) > at > org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158) > at > org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155) > at > org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135) > at > org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) > at > org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) > at > org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261) > at > org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70) > at > org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738) > at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76) > at com.demo.DataflowJob.main(DataflowJob.java:110) {code} > > The source table is a postgres table, which schema is: > {code:java} > create table t_test ( > id integer primary key, > name timestamptz > );{code} > > I've read the relevant code and roughly located where the problem is. Can I > make some code contribution to fix this problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)