This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 08b6f992a [Hotfix][JDBC Sink] Fix JDBC Sink oom bug (#4690)
08b6f992a is described below

commit 08b6f992aac166e192f6cfceabbc65ca710b06f1
Author: Eric <[email protected]>
AuthorDate: Sat May 6 19:46:31 2023 +0800

    [Hotfix][JDBC Sink] Fix JDBC Sink oom bug (#4690)
---
 .../connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java  | 11 +++++++++--
 .../connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java        |  1 +
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index 00bfd6634..873a542f5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
 
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
@@ -126,7 +127,7 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
         return exec;
     }
 
-    private void checkFlushException() {
+    public void checkFlushException() {
         if (flushException != null) {
             throw new JdbcConnectorException(
                     CommonErrorCode.FLUSH_DATA_FAILED,
@@ -155,7 +156,13 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>> implem
     }
 
     public synchronized void flush() throws IOException {
-        checkFlushException();
+        if (flushException != null) {
+            LOG.warn(
+                    String.format(
+                            "An exception occurred during the previous flush 
process %s, skipping this flush",
+                            ExceptionUtils.getMessage(flushException)));
+            return;
+        }
         final int sleepMs = 1000;
         for (int i = 0; i <= jdbcConnectionConfig.getMaxRetries(); i++) {
             try {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 89573fd4e..db543e170 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -84,6 +84,7 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
     @Override
     public Optional<XidInfo> prepareCommit() throws IOException {
         tryOpen();
+        outputFormat.checkFlushException();
         outputFormat.flush();
         try {
             if (!connectionProvider.getConnection().getAutoCommit()) {

Reply via email to