This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push: new ffd2bdf8 [FLINK-34014][jdbc] Avoid executeBatch when buffer is empty ffd2bdf8 is described below commit ffd2bdf887d997a6080f3a3bdba31a92b969e1e0 Author: Jeyhun Karimov <je.kari...@gmail.com> AuthorDate: Tue Mar 26 15:10:32 2024 +0100 [FLINK-34014][jdbc] Avoid executeBatch when buffer is empty --- .../TableBufferReducedStatementExecutor.java | 20 +++++++++++--------- .../executor/TableBufferedStatementExecutor.java | 10 ++++++---- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java index 8be2059b..fd062ccc 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java @@ -86,17 +86,19 @@ public final class TableBufferReducedStatementExecutor @Override public void executeBatch() throws SQLException { - for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) { - if (entry.getValue().f0) { - upsertExecutor.addToBatch(entry.getValue().f1); - } else { - // delete by key - deleteExecutor.addToBatch(entry.getKey()); + if (!reduceBuffer.isEmpty()) { + for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) { + if (entry.getValue().f0) { + upsertExecutor.addToBatch(entry.getValue().f1); + } else { + // delete by key + deleteExecutor.addToBatch(entry.getKey()); + } } + upsertExecutor.executeBatch(); + deleteExecutor.executeBatch(); + reduceBuffer.clear(); } - upsertExecutor.executeBatch(); - deleteExecutor.executeBatch(); - reduceBuffer.clear(); } @Override diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java index 89bcca4c..7e5e50ec 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java @@ -52,11 +52,13 @@ public final class TableBufferedStatementExecutor implements JdbcBatchStatementE @Override public void executeBatch() throws SQLException { - for (RowData value : buffer) { - statementExecutor.addToBatch(value); + if (!buffer.isEmpty()) { + for (RowData value : buffer) { + statementExecutor.addToBatch(value); + } + statementExecutor.executeBatch(); + buffer.clear(); } - statementExecutor.executeBatch(); - buffer.clear(); } @Override