shizhengchao created FLINK-19723: ------------------------------------ Summary: The retry mechanism of jdbc connector has the risk of data duplication Key: FLINK-19723 URL: https://issues.apache.org/jira/browse/FLINK-19723 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.11.1 Reporter: shizhengchao
for example, if statement.executeBatch() failur for some reason, but the "statement" was not closed in the retry, there is a risk of data duplication: {code:java} for (int i = 1; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } try { Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); } } } {code} the correct code should be: {code:java} try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); } jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)