[ 
https://issues.apache.org/jira/browse/FLINK-16708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shangwen Tang updated FLINK-16708:
----------------------------------
    Description: 
In our test environment, I used the tcpkill command to simulate a scenario 
where the postgresql connection was closed. I found that the retry strategy of 
the flush method did not take effect
{code:java}
2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at 
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at 
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
{code}

  was:
In our test environment, I used the tcpkill command to simulate a scenario 
where the postgresql connection was closed. I found that the retry strategy of 
the flush method did not take effect, and when it retried the second time, it 
could not recognize that the connection had been closed because Before the 
first check whether the connection is closed, the batchStatements of 
PgStatement have been cleared, which causes the second execution to check that 
the batchStatements are empty and return normally.
{code:java}
2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at 
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
error, retry times = 1
 org.postgresql.util.PSQLException: This connection has been closed.
         at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
         at 
org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
         at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
         at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
         at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
         at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
         at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
{code}


> When a JDBC connection has been closed, the retry policy of the 
> JDBCUpsertOutputFormat cannot take effect 
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16708
>                 URL: https://issues.apache.org/jira/browse/FLINK-16708
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.10.0
>            Reporter: Shangwen Tang
>            Assignee: Shangwen Tang
>            Priority: Major
>
> In our test environment, I used the tcpkill command to simulate a scenario 
> where the postgresql connection was closed. I found that the retry strategy 
> of the flush method did not take effect
> {code:java}
> 2020-03-20 21:16:18.246 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
> error, retry times = 1
>  org.postgresql.util.PSQLException: This connection has been closed.
>          at 
> org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>          at 
> org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>          at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>          at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>          at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>          at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>          at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>          at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>          at java.lang.Thread.run(Thread.java:748)
>  2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC executeBatch 
> error, retry times = 1
>  org.postgresql.util.PSQLException: This connection has been closed.
>          at 
> org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
>          at 
> org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
>          at 
> org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
>          at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
>          at 
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
>          at 
> org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
>          at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
>          at 
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>          at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to