Fawad Halim created FLINK-14524:
-----------------------------------

             Summary: PostgreSQL JDBC sink generates invalid SQL in upsert mode
                 Key: FLINK-14524
                 URL: https://issues.apache.org/jira/browse/FLINK-14524
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.9.1, 1.10.0
            Reporter: Fawad Halim


The "upsert" query generated for the PostgreSQL dialect is missing a closing 
parenthesis in the ON CONFLICT clause, causing the INSERT statement to error 
out with the error

 

{{ERROR o.a.f.s.runtime.tasks.StreamTask - Error during disposal of stream 
operator.}}
{{java.lang.RuntimeException: Writing records to JDBC failed.}}
{{ at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135)}}
{{ at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.close(JDBCUpsertOutputFormat.java:184)}}
{{ at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.close(JDBCUpsertSinkFunction.java:61)}}
{{ at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)}}
{{ at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:585)}}
{{ at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:484)}}
{{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)}}
{{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{{Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO 
"public.temperature"("id", "timestamp", "temperature") VALUES ('sensor_17', 
'2019-10-25 00:39:10-05', 20.27573964210997) ON CONFLICT ("id", "timestamp" DO 
UPDATE SET "id"=EXCLUDED."id", "timestamp"=EXCLUDED."timestamp", 
"temperature"=EXCLUDED."temperature" was aborted: ERROR: syntax error at or 
near "DO"}}
{{ Position: 119 Call getNextException to see other errors in the batch.}}
{{ at 
org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:163)}}
{{ at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:838)}}
{{ at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)}}
{{ at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingUpsertStatement.internalExecuteBatch(UpsertWriter.java:177)}}
{{ at 
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:117)}}
{{ at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)}}
{{ at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)}}
{{ at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)}}
{{ at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ ... 1 common frames omitted}}
{{Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near 
"DO"}}
{{ Position: 119}}
{{ at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)}}
{{ at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)}}
{{ at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)}}
{{ at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)}}
{{ ... 12 common frames omitted}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to