??????????????????JDBC????postgresql??????????tcpkill????????????????????????????????????????????????????????????????????
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This connection has been closed. at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ??????????????????????????????????????????????????Flink?????????? //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } ????????debug?????????????????? JDBCUpsertOutputFormat.flush -> AppendOnlyWriter.executeBatch ... -> PgConnection.getAutoCommit ????PSQLException: This connection has been closed????batchStatements?????????????????? // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // ?????????????? batchParameters.clear(); ... if (connection.getAutoCommit()) { // ???????? flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } ??????Flink????????????????????????????jdbcWriter.executeBatch????????batchStatements??????Empty??????????????????Flink???????????????????????????????????????????? // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //???????????????? return new int[0]; } return internalExecuteBatch().getUpdateCount(); } ????????????????????????????????????????????????????????????????????????????????????????????????????open????????????????????????????????issue https://issues.apache.org/jira/browse/FLINK-16708