[ 
https://issues.apache.org/jira/browse/FLINK-23324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381186#comment-17381186
 ] 

Ada Wong edited comment on FLINK-23324 at 7/15/21, 8:57 AM:
------------------------------------------------------------

This following is Postgres table DDL .

{code:sql}
create table foo
(
    id        integer,
    "foo_Bar" varchar(40)
);
{code}

This following is Flink Sink DDL
{code:sql}
CREATE TABLE pg_sink (
  id INT,
  foo_Bar STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://localhost:5432/postgres',
   'table-name' = 'foo',
   'username' = 'postgres',
   'password' = 'postgres'
);
{code}

When we use Flink SQL insert Postgres, it throw following exception.


2021-07-15 16:50:18,719 - 17170 ERROR [jdbc-upsert-output-format-thread-1] 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat:JDBC 
executeBatch error, retry times = 3
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo(id, foo_Bar) 
VALUES (0, 'foo') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
foo_Bar=EXCLUDED.foo_Bar was aborted: ERROR: column "foo_bar" of relation "foo" 
does not exist
  位置:21  Call getNextException to see other errors in the batch.
        at 
org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:199)
        at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:863)
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
        at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
        at 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
        at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
        at 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: column "foo_bar" of 
relation "foo" does not exist
  位置:21
        at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
        at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
        at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
        at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
        ... 15 more


was (Author: ana4):
This following is Postgres table DDL .

{code:sql}
create table foo
(
    id        integer,
    "foo_Bar" varchar(40)
);
{code}

This following is Flink Sink DDL
{code:sql}
CREATE TABLE pg_sink (
  id INT,
  foo_Bar STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://localhost:5432/postgres',
   'table-name' = 'foo',
   'username' = 'postgres',
   'password' = 'postgres'
);
{code}

When we use Flink SQL insert Postgres, it throw following exception.

2021-07-15 16:50:18,719 - 17170 ERROR [jdbc-upsert-output-format-thread-1] 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat:JDBC 
executeBatch error, retry times = 3
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO foo(id, foo_Bar) 
VALUES (0, 'foo') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
foo_Bar=EXCLUDED.foo_Bar was aborted: ERROR: column "foo_bar" of relation "foo" 
does not exist
  位置:21  Call getNextException to see other errors in the batch.
        at 
org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:199)
        at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:863)
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
        at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
        at 
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
        at 
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
        at 
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
        at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: column "foo_bar" of 
relation "foo" does not exist
  位置:21
        at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552)
        at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284)
        at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
        at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
        ... 15 more

> Postgres of JDBC Connector enable case-sensitive.
> -------------------------------------------------
>
>                 Key: FLINK-23324
>                 URL: https://issues.apache.org/jira/browse/FLINK-23324
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.13.1, 1.12.4
>            Reporter: Ada Wong
>            Priority: Major
>
> Now the PostgresDialect is case-insensitive. I think this is a bug.
> https://stackoverflow.com/questions/20878932/are-postgresql-column-names-case-sensitive
> https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
> Could we delete PostgresDialect#quoteIdentifier, make it using super class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to