This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 08b6f992a [Hotfix][JDBC Sink] Fix JDBC Sink oom bug (#4690)
08b6f992a is described below
commit 08b6f992aac166e192f6cfceabbc65ca710b06f1
Author: Eric <[email protected]>
AuthorDate: Sat May 6 19:46:31 2023 +0800
[Hotfix][JDBC Sink] Fix JDBC Sink oom bug (#4690)
---
.../connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java | 11 +++++++++--
.../connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java | 1 +
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index 00bfd6634..873a542f5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
@@ -126,7 +127,7 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>> implem
return exec;
}
- private void checkFlushException() {
+ public void checkFlushException() {
if (flushException != null) {
throw new JdbcConnectorException(
CommonErrorCode.FLUSH_DATA_FAILED,
@@ -155,7 +156,13 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>> implem
}
public synchronized void flush() throws IOException {
- checkFlushException();
+ if (flushException != null) {
+ LOG.warn(
+ String.format(
+ "An exception occurred during the previous flush
process %s, skipping this flush",
+ ExceptionUtils.getMessage(flushException)));
+ return;
+ }
final int sleepMs = 1000;
for (int i = 0; i <= jdbcConnectionConfig.getMaxRetries(); i++) {
try {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 89573fd4e..db543e170 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -84,6 +84,7 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
@Override
public Optional<XidInfo> prepareCommit() throws IOException {
tryOpen();
+ outputFormat.checkFlushException();
outputFormat.flush();
try {
if (!connectionProvider.getConnection().getAutoCommit()) {