shizhengchao created FLINK-19723:
------------------------------------

             Summary: The retry mechanism of jdbc connector has the risk of 
data duplication
                 Key: FLINK-19723
                 URL: https://issues.apache.org/jira/browse/FLINK-19723
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.11.1
            Reporter: shizhengchao


for example,  if statement.executeBatch() failur for some reason, but the 
"statement" was not closed in the retry, there is a risk of data duplication:
{code:java}
for (int i = 1; i <= executionOptions.getMaxRetries(); i++) {
        try {
                attemptFlush();
                batchCount = 0;
                break;
        } catch (SQLException e) {
                LOG.error("JDBC executeBatch error, retry times = {}", i, e);
                if (i >= executionOptions.getMaxRetries()) {
                        throw new IOException(e);
                }
                try {
                        if 
(!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
                                connection = 
connectionProvider.reestablishConnection();
                                jdbcStatementExecutor.closeStatements();
                                
jdbcStatementExecutor.prepareStatements(connection);
                        }
                } catch (Exception excpetion) {
                        LOG.error("JDBC connection is not valid, and 
reestablish connection failed.", excpetion);
                        throw new IOException("Reestablish JDBC connection 
failed", excpetion);
                }
                try {
                        Thread.sleep(1000 * i);
                } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        throw new IOException("unable to flush; interrupted 
while doing another attempt", e);
                }
        }
}
{code}

the correct code should be:
{code:java}
try {
                        if 
(!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
                                connection = 
connectionProvider.reestablishConnection();
                        }
                        jdbcStatementExecutor.closeStatements();
                        jdbcStatementExecutor.prepareStatements(connection);
                } catch (Exception excpetion) {
                        LOG.error("JDBC connection is not valid, and 
reestablish connection failed.", excpetion);
                        throw new IOException("Reestablish JDBC connection 
failed", excpetion);
                }
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to