[ https://issues.apache.org/jira/browse/FLINK-19723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17222726#comment-17222726 ]
shizhengchao commented on FLINK-19723: -------------------------------------- Hi [~jark], after testing, this is not a bug, could you close the issue?:D Generally, after executing executeBatch in jdbc, regardless of success or failure, the list of preparedStatements will be cleared > The retry mechanism of jdbc connector has the risk of data duplication > ---------------------------------------------------------------------- > > Key: FLINK-19723 > URL: https://issues.apache.org/jira/browse/FLINK-19723 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.11.1 > Reporter: shizhengchao > Assignee: shizhengchao > Priority: Major > Labels: pull-request-available > > for example, if statement.executeBatch() failur for some reason, but the > "statement" was not closed in the retry, there is a risk of data duplication: > {code:java} > for (int i = 1; i <= executionOptions.getMaxRetries(); i++) { > try { > attemptFlush(); > batchCount = 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times = {}", i, e); > if (i >= executionOptions.getMaxRetries()) { > throw new IOException(e); > } > try { > if > (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { > connection = > connectionProvider.reestablishConnection(); > jdbcStatementExecutor.closeStatements(); > > jdbcStatementExecutor.prepareStatements(connection); > } > } catch (Exception excpetion) { > LOG.error("JDBC connection is not valid, and > reestablish connection failed.", excpetion); > throw new IOException("Reestablish JDBC connection > failed", excpetion); > } > try { > Thread.sleep(1000 * i); > } catch (InterruptedException ex) { > Thread.currentThread().interrupt(); > throw new IOException("unable to flush; interrupted > while doing another attempt", e); > } > } > } > {code} > the correct code should be: > {code:java} > try { > if > (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { > connection = > connectionProvider.reestablishConnection(); > } > jdbcStatementExecutor.closeStatements(); > jdbcStatementExecutor.prepareStatements(connection); > } catch (Exception excpetion) { > LOG.error("JDBC connection is not valid, and > reestablish connection failed.", excpetion); > throw new IOException("Reestablish JDBC connection > failed", excpetion); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)