Hi,
I have a peace of Flink Streaming code that reads data from files and
inserts them into the PostgreSQL table. After inserting 6 to 11 million
records, I got the following errors:























*Caused by: java.lang.RuntimeException: Execution of JDBC statement failed.
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
... 15 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT
INTO csv_data(asset, tag, t, q, v, backfill, createdAt, createdBy) VALUES
('SST', 'XC_XC', '2015-04-11 21:36:23+03', 12.0, '1.00', 'FALSE',
'2020-01-23 19:22:14.469+03', 'system') ON CONFLICT DO NOTHING was aborted:
An I/O error occurred while sending to the backend.  Call getNextException
to see other errors in the batch. at
org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:515)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:853) at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
... 21 moreCaused by: org.postgresql.util.PSQLException: An I/O error
occurred while sending to the backend. at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:516)
... 24 moreCaused by: java.io.EOFException at
org.postgresql.core.PGStream.receiveChar(PGStream.java:337) at
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2000)*
at
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:510)
... 24 more

However as I enabled the Restart Strategy, the app will automatically be
restarted and reconnect to the database.
My code simply reads data from files and after transforming them into the
table schema, insert the rows into the table.

It would be great if anyone can help me with this
Thanks

Reply via email to