This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new ffd2bdf8 [FLINK-34014][jdbc] Avoid executeBatch when buffer is empty
ffd2bdf8 is described below

commit ffd2bdf887d997a6080f3a3bdba31a92b969e1e0
Author: Jeyhun Karimov <je.kari...@gmail.com>
AuthorDate: Tue Mar 26 15:10:32 2024 +0100

    [FLINK-34014][jdbc] Avoid executeBatch when buffer is empty
---
 .../TableBufferReducedStatementExecutor.java         | 20 +++++++++++---------
 .../executor/TableBufferedStatementExecutor.java     | 10 ++++++----
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
index 8be2059b..fd062ccc 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java
@@ -86,17 +86,19 @@ public final class TableBufferReducedStatementExecutor
 
     @Override
     public void executeBatch() throws SQLException {
-        for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
reduceBuffer.entrySet()) {
-            if (entry.getValue().f0) {
-                upsertExecutor.addToBatch(entry.getValue().f1);
-            } else {
-                // delete by key
-                deleteExecutor.addToBatch(entry.getKey());
+        if (!reduceBuffer.isEmpty()) {
+            for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
reduceBuffer.entrySet()) {
+                if (entry.getValue().f0) {
+                    upsertExecutor.addToBatch(entry.getValue().f1);
+                } else {
+                    // delete by key
+                    deleteExecutor.addToBatch(entry.getKey());
+                }
             }
+            upsertExecutor.executeBatch();
+            deleteExecutor.executeBatch();
+            reduceBuffer.clear();
         }
-        upsertExecutor.executeBatch();
-        deleteExecutor.executeBatch();
-        reduceBuffer.clear();
     }
 
     @Override
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
index 89bcca4c..7e5e50ec 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java
@@ -52,11 +52,13 @@ public final class TableBufferedStatementExecutor 
implements JdbcBatchStatementE
 
     @Override
     public void executeBatch() throws SQLException {
-        for (RowData value : buffer) {
-            statementExecutor.addToBatch(value);
+        if (!buffer.isEmpty()) {
+            for (RowData value : buffer) {
+                statementExecutor.addToBatch(value);
+            }
+            statementExecutor.executeBatch();
+            buffer.clear();
         }
-        statementExecutor.executeBatch();
-        buffer.clear();
     }
 
     @Override

Reply via email to