??????????????????JDBC????postgresql??????????tcpkill????????????????????????????????????????????????????????????????????


2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC 
executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
        at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
        at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
        at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
        at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
        at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
        at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
        at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)



??????????????????????????????????????????????????Flink??????????
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
      try {
         jdbcWriter.executeBatch();
         batchCount = 0;
         break;
      } catch (SQLException e) {
         LOG.error("JDBC executeBatch error, retry times = {}", i, e);
         if (i &gt;= maxRetryTimes) {
            throw e;
         }
         Thread.sleep(1000 * i);
      }
   }
}


????????debug??????????????????
JDBCUpsertOutputFormat.flush
&nbsp; -&gt; AppendOnlyWriter.executeBatch
&nbsp; &nbsp; &nbsp;...
&nbsp; &nbsp; &nbsp;-&gt; PgConnection.getAutoCommit
????PSQLException: This connection has been 
closed????batchStatements??????????????????
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
SQLException {   // Construct query/parameter arrays.   
transformQueriesAndParameters();   // Empty arrays should be passed to toArray  
 // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries 
= batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = 
batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 
??????????????   batchParameters.clear();   ...   if 
(connection.getAutoCommit()) { // ????????   flags |= 
QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }


??????Flink????????????????????????????jdbcWriter.executeBatch????????batchStatements??????Empty??????????????????Flink????????????????????????????????????????????
// PgStatement.java public int[] executeBatch() throws SQLException {   
checkClosed();   closeForNextExecution();   if (batchStatements == null || 
batchStatements.isEmpty()) { //????????????????     return new int[0];   }   
return internalExecuteBatch().getUpdateCount(); }


????????????????????????????????????????????????????????????????????????????????????????????????????open????????????????????????????????issue
https://issues.apache.org/jira/browse/FLINK-16708

回复