[ 
https://issues.apache.org/jira/browse/FLINK-34935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Du Yuzhou updated FLINK-34935:
------------------------------
    Description: 
When I use JdbcCatalog to select from postgres table, it throw Exception:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported 
type:TIMESTAMP_LTZ(6)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
    at 
org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50)
    at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
    at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
    at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
    at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
    at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
    at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
    at 
org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80)
    at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
    at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
    at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76)
    at com.demo.DataflowJob.main(DataflowJob.java:110) {code}
 

The source table is a postgres table, which schema is:
{code:java}
create table t_test (
 id integer primary key,
 name timestamptz
);{code}
 

I've read the relevant code and roughly located where the problem is. Can I 
make some code contribution to fix this problem?

  was:
When I use JdbcCatalog to select from postgres table, it throw Exception:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported 
type:TIMESTAMP_LTZ(6)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
    at 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
    at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
    at 
org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50)
    at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
    at 
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
    at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
    at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
    at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
    at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
    at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
    at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
    at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
    at 
org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80)
    at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
    at 
org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
    at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76)
    at com.demo.DataflowJob.main(DataflowJob.java:110) {code}
 

The source table is a postgres table, which schema is:
{code:java}
create table t_test (
 id integer primary key,
 name timestamptz
);{code}
 

I've read the relevant code and roughly located where the problem is. Can I 
create some corresponding pr to fix this problem?


> TIMESTAMP_LTZ type Unsupported when using JdbcCatalog to read from Postgres
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-34935
>                 URL: https://issues.apache.org/jira/browse/FLINK-34935
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.1
>         Environment: flink: 1.17.0
> flink-connector-jdbc: 3.1.0-1.17
> postgres: 14.5
> java: 11
>            Reporter: Du Yuzhou
>            Priority: Major
>              Labels: flink, jdbc_connector, postgres
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> When I use JdbcCatalog to select from postgres table, it throw Exception:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: 
> Unsupported type:TIMESTAMP_LTZ(6)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
>     at 
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
>     at 
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
>     at 
> org.apache.flink.connector.jdbc.dialect.psql.PostgresDialect.getRowConverter(PostgresDialect.java:50)
>     at 
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:182)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
>     at 
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
>     at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
>     at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
>     at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
>     at 
> org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
>     at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
>     at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
>     at 
> org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
>     at 
> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
>     at 
> org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable.convertToRel(QueryOperationCatalogViewTable.java:80)
>     at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:70)
>     at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:57)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
>     at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
>     at com.demo.flow.ExportCsvFlow.run(ExportCsvFlow.java:76)
>     at com.demo.DataflowJob.main(DataflowJob.java:110) {code}
>  
> The source table is a postgres table, which schema is:
> {code:java}
> create table t_test (
>  id integer primary key,
>  name timestamptz
> );{code}
>  
> I've read the relevant code and roughly located where the problem is. Can I 
> make some code contribution to fix this problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to