This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new db434adc1 [Feature] [Connector-V2 E2E] Add mysql and postgres e2e test
and bug fix (#2838)
db434adc1 is described below
commit db434adc157811b87c7cd08c9dd023e9ef7f8b9d
Author: ic4y <[email protected]>
AuthorDate: Sat Sep 24 09:41:26 2022 +0800
[Feature] [Connector-V2 E2E] Add mysql and postgres e2e test and bug fix
(#2838)
* add mysql and postgres e2e test
* fix bug
---
.../internal/dialect/mysql/MySqlTypeMapper.java | 3 +-
.../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java | 8 +-
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 1 +
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 3 +
.../jdbc/internal/xa/SemanticXidGeneratorTest.java | 49 +++++
.../{ => connector-jdbc-it}/pom.xml | 30 ++-
.../jdbc/internal/xa/XaGroupOpsImplIT.java | 126 ++++++++++++
.../src/test/resources/log4j.properties | 22 ++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../connector-jdbc-flink-e2e/pom.xml | 14 +-
.../seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java | 218 ++++++++++++++++++++
.../e2e/flink/v2/jdbc/JdbcPostgresIT.java | 191 ++++++++++++++++++
.../test/resources/jdbc/init_sql/mysql_init.conf | 221 +++++++++++++++++++++
.../resources/jdbc/jdbc_mysql_source_and_sink.conf | 44 ++++
.../jdbc/jdbc_mysql_source_and_sink_datatype.conf | 116 +++++++++++
.../jdbc/jdbc_mysql_source_and_sink_parallel.conf | 52 +++++
...mysql_source_and_sink_parallel_upper_lower.conf | 54 +++++
.../jdbc/jdbc_mysql_source_and_sink_xa.conf | 54 +++++
.../jdbc/jdbc_postgres_source_and_sink.conf | 44 ++++
.../jdbc_postgres_source_and_sink_parallel.conf | 52 +++++
...tgres_source_and_sink_parallel_upper_lower.conf | 54 +++++
.../jdbc/jdbc_postgres_source_and_sink_xa.conf | 53 +++++
.../connector-jdbc-spark-e2e/pom.xml | 19 ++
.../seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java | 218 ++++++++++++++++++++
.../e2e/spark/v2/jdbc/JdbcPostgresIT.java | 191 ++++++++++++++++++
.../test/resources/jdbc/init_sql/mysql_init.conf | 216 ++++++++++++++++++++
.../resources/jdbc/jdbc_mysql_source_and_sink.conf | 44 ++++
.../jdbc/jdbc_mysql_source_and_sink_datatype.conf | 114 +++++++++++
.../jdbc/jdbc_mysql_source_and_sink_parallel.conf | 52 +++++
...mysql_source_and_sink_parallel_upper_lower.conf | 54 +++++
.../jdbc/jdbc_mysql_source_and_sink_xa.conf | 54 +++++
.../jdbc/jdbc_postgres_source_and_sink.conf | 44 ++++
.../jdbc_postgres_source_and_sink_parallel.conf | 52 +++++
...tgres_source_and_sink_parallel_upper_lower.conf | 54 +++++
.../jdbc/jdbc_postgres_source_and_sink_xa.conf | 53 +++++
35 files changed, 2553 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index db432f62d..aa8206185 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -103,6 +103,7 @@ public class MySqlTypeMapper implements
JdbcDialectTypeMapper {
case MYSQL_MEDIUMINT_UNSIGNED:
case MYSQL_INT:
case MYSQL_INTEGER:
+ case MYSQL_YEAR:
return BasicType.INT_TYPE;
case MYSQL_INT_UNSIGNED:
case MYSQL_INTEGER_UNSIGNED:
@@ -138,8 +139,6 @@ public class MySqlTypeMapper implements
JdbcDialectTypeMapper {
+ "the precision will be set to 2147483647.",
MYSQL_LONGTEXT);
return BasicType.STRING_TYPE;
-
- case MYSQL_YEAR:
case MYSQL_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case MYSQL_TIME:
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index ff2012fd7..9ad3bbe93 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
public class XaGroupOpsImpl
implements XaGroupOps {
@@ -112,8 +113,11 @@ public class XaGroupOpsImpl
}
@Override
- public void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
- Collection<Xid> recovered = xaFacade.recover();
+ public void recoverAndRollback(JobContext context, SinkWriter.Context
sinkContext, XidGenerator xidGenerator,
+ Xid excludeXid) {
+ Collection<Xid> recovered = xaFacade.recover().stream()
+ .map(x -> new XidImpl(x.getFormatId(), x.getGlobalTransactionId(),
x.getBranchQualifier())).collect(
+ Collectors.toList());
recovered.remove(excludeXid);
if (recovered.isEmpty()) {
return;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 20461db9f..0b26da681 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -134,6 +134,7 @@ public class JdbcExactlyOnceSinkWriter
@Override
public Optional<XidInfo> prepareCommit()
throws IOException {
+ tryOpen();
prepareCurrentTx();
this.currentXid = null;
beginTx();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index f081ad1c0..86aa5f9ae 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -79,6 +79,7 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
@Override
public Optional<XidInfo> prepareCommit()
throws IOException {
+ tryOpen();
outputFormat.flush();
return Optional.empty();
}
@@ -91,6 +92,8 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
@Override
public void close()
throws IOException {
+ tryOpen();
+ outputFormat.flush();
outputFormat.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
new file mode 100644
index 000000000..906c3bd33
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.transaction.xa.Xid;
+
+class SemanticXidGeneratorTest {
+ private JobContext jobContext;
+ private SemanticXidGenerator xidGenerator;
+
+ @BeforeEach
+ void before() {
+ jobContext = new JobContext();
+ xidGenerator = new SemanticXidGenerator();
+ xidGenerator.open();
+ }
+
+ @Test
+ void testBelongsToSubtask() {
+ DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(1);
+ Xid xid1 = xidGenerator.generateXid(jobContext, dc1,
System.currentTimeMillis());
+ Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext,
dc1));
+ Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, jobContext,
new DefaultSinkWriterContext(2)));
+ Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, new
JobContext(), dc1));
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
similarity index 68%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
index dc1149995..de81e8ff2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
@@ -17,38 +17,34 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <modules>
- <module>connector-assert-e2e</module>
- </modules>
- <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <artifactId>connector-jdbc-it</artifactId>
+
+ <properties>
+ <testcontainers.version>1.17.3</testcontainers.version>
+ </properties>
<dependencies>
+ <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e-common</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-starter</artifactId>
+ <artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-starter</artifactId>
- <version>${project.version}</version>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
new file mode 100644
index 000000000..dd8b02d3e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.util.stream.Stream;
+
+class XaGroupOpsImplIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(XaGroupOpsImplIT.class);
+ private MySQLContainer<?> mc;
+ private XaGroupOps xaGroupOps;
+ private SemanticXidGenerator xidGenerator;
+ private JdbcConnectionOptions jdbcConnectionOptions;
+ private XaFacade xaFacade;
+ private XAResource xaResource;
+
+ @BeforeEach
+ void before() throws Exception {
+ // Non-root users need to grant XA_RECOVER_ADMIN permission
+ mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+ .withUsername("root")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(mc)).join();
+
+ jdbcConnectionOptions = JdbcConnectionOptions.builder()
+ .withUrl(mc.getJdbcUrl())
+ .withUsername(mc.getUsername())
+ .withPassword(mc.getPassword())
+ .withXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource")
+ .build();
+
+ xidGenerator = new SemanticXidGenerator();
+ xidGenerator.open();
+ xaFacade = new XaFacadeImplAutoLoad(jdbcConnectionOptions);
+ xaFacade.open();
+ xaGroupOps = new XaGroupOpsImpl(xaFacade);
+
+ XADataSource xaDataSource = (XADataSource)
DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
+ xaResource = xaDataSource.getXAConnection().getXAResource();
+
+ }
+
+ @Test
+ void testRecoverAndRollback() throws Exception {
+ JobContext jobContext = new JobContext();
+ SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1);
+ Xid xid1 =
+ xidGenerator.generateXid(jobContext, writerContext1,
System.currentTimeMillis());
+ Xid xid2 =
+ xidGenerator.generateXid(jobContext, writerContext1,
System.currentTimeMillis() + 1);
+
+ xaFacade.start(xid1);
+ xaFacade.endAndPrepare(xid1);
+
+ xaFacade.start(xid2);
+ xaFacade.endAndPrepare(xid2);
+
+ Assertions.assertTrue(checkPreparedXid(xid1));
+ Assertions.assertTrue(checkPreparedXid(xid2));
+
+ xaGroupOps.recoverAndRollback(jobContext, writerContext1,
xidGenerator, xid2);
+
+ Assertions.assertFalse(checkPreparedXid(xid1));
+ Assertions.assertTrue(checkPreparedXid(xid2));
+
+ }
+
+ private boolean checkPreparedXid(Xid xidCrr) throws XAException {
+ Xid[] recover = xaResource.recover(TMSTARTRSCAN);
+ for (int i = 0; i < recover.length; i++) {
+ XidImpl xid = new XidImpl(recover[i].getFormatId(),
recover[i].getGlobalTransactionId(),
+ recover[i].getBranchQualifier());
+ if (xid.equals(xidCrr)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (mc != null) {
+ mc.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index dc1149995..0c42e9ba8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -25,6 +25,7 @@
<packaging>pom</packaging>
<modules>
<module>connector-assert-e2e</module>
+ <module>connector-jdbc-it</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index 1d46370b6..3cf6f5d77 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -25,6 +25,10 @@
<artifactId>connector-jdbc-flink-e2e</artifactId>
+ <properties>
+ <testcontainers.version>1.17.3</testcontainers.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -59,9 +63,17 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
- <version>1.17.3</version>
+ <version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
new file mode 100644
index 000000000..572e82ad4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcMysqlIT extends FlinkContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcMysqlIT.class);
+ private MySQLContainer<?> mc;
+ private Config config;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws Exception {
+ // Non-root users need to grant XA_RECOVER_ADMIN permission on
is_exactly_once = "true"
+ mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withUsername("root")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(mc)).join();
+ LOGGER.info("Mysql container started");
+ Class.forName(mc.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcTable());
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ java.net.URL resource =
FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf");
+ if (resource == null) {
+ throw new IllegalArgumentException("can't find find file");
+ }
+
+ config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
+
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ statement.execute(config.getString("source_table"));
+ statement.execute(config.getString("sink_table"));
+ statement.execute(config.getString("type_source_table"));
+ statement.execute(config.getString("type_sink_table"));
+
statement.execute(config.getString("insert_type_source_table_sql"));
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() {
+ String sql = "insert into source(name, age) values(?,?)";
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ for (List row : generateTestDataset()) {
+ preparedStatement.setString(1, (String) row.get(0));
+ preparedStatement.setInt(2, (Integer) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSink() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws
Exception {
+ Container.ExecResult execResult =
+
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+
+ //lower=1 upper=50
+ List<List> limit50 =
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+ Assertions.assertIterableEquals(limit50, sortedResult);
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkXA() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_xa.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ checkSinkDataTypeTable();
+ }
+
+ private void checkSinkDataTypeTable() throws Exception {
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+ resultSet.next();
+ Assertions.assertEquals(resultSet.getInt(1), 2);
+ }
+ }
+
+ private List<List> queryResult() {
+ List<List> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "select name , age from sink ";
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(
+ Arrays.asList(
+ resultSet.getString(1),
+ resultSet.getInt(2)
+ )
+ );
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Query result data failed!", e);
+ }
+ return result;
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 1000; i++) {
+ rows.add(
+ Arrays.asList(
+ String.format("user_%s", i),
+ i
+ ));
+ }
+ return rows;
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (mc != null) {
+ mc.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
new file mode 100644
index 000000000..c45e0693e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcPostgresIT extends FlinkContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcPostgresIT.class);
+ private PostgreSQLContainer<?> pg;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws Exception {
+ pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withCommand("postgres -c max_prepared_transactions=100")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(pg)).join();
+ LOGGER.info("Postgres container started");
+ Class.forName(pg.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcTable());
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ Statement statement = connection.createStatement();
+ String source = "create table source(\n" +
+ "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+ "name char(10),\n" +
+ "age INT\n" +
+ ")";
+ String sink = "create table sink(\n" +
+ "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+ "name char(10),\n" +
+ "age INT\n" +
+ ")";
+ statement.execute(source);
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() {
+ String sql = "insert into source(name, age) values(?,?)";
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ for (List row : generateTestDataset()) {
+ preparedStatement.setString(1, (String) row.get(0));
+ preparedStatement.setInt(2, (Integer) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSink() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws
Exception {
+ Container.ExecResult execResult =
+
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+
+ //lower=1 upper=50
+ List<List> limit50 =
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+ Assertions.assertIterableEquals(limit50, sortedResult);
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkXA() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ private List<List> queryResult() {
+ List<List> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "select name , age from sink ";
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(
+ Arrays.asList(
+ resultSet.getString(1).replace(" ", ""),
+ resultSet.getInt(2)
+ )
+ );
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Query result data failed!", e);
+ }
+ return result;
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 1000; i++) {
+ rows.add(
+ Arrays.asList(
+ String.format("user_%s", i),
+ i
+ ));
+ }
+ return rows;
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (pg != null) {
+ pg.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
new file mode 100644
index 000000000..88178d7d1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
@@ -0,0 +1,221 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+source_table = """ CREATE TABLE source ( `user_id` INT PRIMARY KEY NOT NULL
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `user_id` INT PRIMARY KEY NOT NULL
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `time` time DEFAULT NULL,
+ `year` YEAR DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `time` time DEFAULT NULL,
+ `year` YEAR DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `time`,
+ `year`,
+ `timestamp`
+)
+VALUES
+ (
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ '{}',
+ '2022-09-22',
+ '2022-09-22 15:07:44',
+ '15:07:48',
+ 2022,
+ '2022-09-22 15:07:55'
+ )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+ count( 1 )
+FROM
+ ( SELECT * FROM type_source_table UNION ALL SELECT * FROM
type_sink_table ) a
+GROUP BY
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `time`,
+ `year`,
+ `timestamp`
+"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
new file mode 100644
index 000000000..4e216040b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
new file mode 100644
index 000000000..840467eea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from type_source_table"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = """ INSERT INTO `type_sink_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `time`,
+ `year`,
+ `timestamp`
+ )
+ VALUES
+ (
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?
+ )"""
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
new file mode 100644
index 000000000..718207133
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..9222dd9f3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ partition_lower_bound=1
+ partition_upper_bound=50
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
new file mode 100644
index 000000000..7d6bfa960
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 1000
+}
+
+source {
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+
+ max_retries = 0
+ query = "insert into sink(name,age) values(?,?)"
+
+ # Non-root users need to grant XA_RECOVER_ADMIN permission on
is_exactly_once = "true"
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+ max_commit_attempts = 3
+ transaction_timeout_sec = 86400
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
new file mode 100644
index 000000000..20244099e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
new file mode 100644
index 000000000..d60fbca69
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..8f929f70d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ partition_lower_bound=1
+ partition_upper_bound=50
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+
+ user = "test"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
new file mode 100644
index 000000000..33662ae58
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 1000
+}
+
+source {
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+
+ max_retries = 0
+ query = "insert into sink(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+ max_commit_attempts = 3
+ transaction_timeout_sec = 86400
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index bd0a341fe..93cf5189b 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -25,6 +25,10 @@
<artifactId>connector-jdbc-spark-e2e</artifactId>
+ <properties>
+ <testcontainers.version>1.17.3</testcontainers.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -43,6 +47,21 @@
<scope>test</scope>
</dependency>
+ <!-- db container -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- jdbc drivers -->
<dependency>
<groupId>mysql</groupId>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
new file mode 100644
index 000000000..a5ffb0180
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcMysqlIT extends SparkContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcMysqlIT.class);
+ private MySQLContainer<?> mc;
+ private Config config;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws Exception {
+ // Non-root users need to grant XA_RECOVER_ADMIN permission on
is_exactly_once = "true"
+ mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withUsername("root")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(mc)).join();
+ LOGGER.info("Mysql container started");
+ Class.forName(mc.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcTable());
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ URL resource =
JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf");
+ if (resource == null) {
+ throw new IllegalArgumentException("can't find find file");
+ }
+
+ config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
+
+ CheckConfigUtil.checkAllExists(this.config, "source_table",
"sink_table", "type_source_table",
+ "type_sink_table", "insert_type_source_table_sql",
"check_type_sink_table_sql");
+
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ statement.execute(config.getString("source_table"));
+ statement.execute(config.getString("sink_table"));
+ statement.execute(config.getString("type_source_table"));
+ statement.execute(config.getString("type_sink_table"));
+
statement.execute(config.getString("insert_type_source_table_sql"));
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() {
+ String sql = "insert into source(name, age) values(?,?)";
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ for (List row : generateTestDataset()) {
+ preparedStatement.setString(1, (String) row.get(0));
+ preparedStatement.setInt(2, (Integer) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSink() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws
Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+
+ //lower=1 upper=50
+ List<List> limit50 =
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+ Assertions.assertIterableEquals(limit50, sortedResult);
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkXA() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_xa.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ checkSinkDataTypeTable();
+ }
+
+ private void checkSinkDataTypeTable() throws Exception {
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+ resultSet.next();
+ Assertions.assertEquals(resultSet.getInt(1), 2);
+ }
+ }
+
+ private List<List> queryResult() {
+ List<List> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(),
mc.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "select name , age from sink ";
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(
+ Arrays.asList(
+ resultSet.getString(1),
+ resultSet.getInt(2)
+ )
+ );
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Query result data failed!", e);
+ }
+ return result;
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 1000; i++) {
+ rows.add(
+ Arrays.asList(
+ String.format("user_%s", i),
+ i
+ ));
+ }
+ return rows;
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (mc != null) {
+ mc.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
new file mode 100644
index 000000000..cd2e55d6f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcPostgresIT extends SparkContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JdbcPostgresIT.class);
+ private PostgreSQLContainer<?> pg;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws Exception {
+ pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withCommand("postgres -c max_prepared_transactions=100")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(pg)).join();
+ LOGGER.info("Postgres container started");
+ Class.forName(pg.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> initializeJdbcTable());
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ Statement statement = connection.createStatement();
+ String source = "create table source(\n" +
+ "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+ "name char(10),\n" +
+ "age INT\n" +
+ ")";
+ String sink = "create table sink(\n" +
+ "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+ "name char(10),\n" +
+ "age INT\n" +
+ ")";
+ statement.execute(source);
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Mysql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() {
+ String sql = "insert into source(name, age) values(?,?)";
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ for (List row : generateTestDataset()) {
+ preparedStatement.setString(1, (String) row.get(0));
+ preparedStatement.setInt(2, (Integer) row.get(1));
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSink() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+ Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws
Exception {
+ Container.ExecResult execResult =
+
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ //Sorting is required, because it is read in parallel, so there will
be out of order
+ List<List> sortedResult =
queryResult().stream().sorted(Comparator.comparing(list -> (Integer)
list.get(1)))
+ .collect(Collectors.toList());
+
+ //lower=1 upper=50
+ List<List> limit50 =
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+ Assertions.assertIterableEquals(limit50, sortedResult);
+ }
+
+ @Test
+ public void testJdbcPostgresSourceAndSinkXA() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+ }
+
+ private List<List> queryResult() {
+ List<List> result = new ArrayList<>();
+ try (Connection connection =
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(),
pg.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "select name , age from sink ";
+ ResultSet resultSet = statement.executeQuery(sql);
+ while (resultSet.next()) {
+ result.add(
+ Arrays.asList(
+ resultSet.getString(1).replace(" ", ""),
+ resultSet.getInt(2)
+ )
+ );
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Query result data failed!", e);
+ }
+ return result;
+ }
+
+ private static List<List> generateTestDataset() {
+ List<List> rows = new ArrayList<>();
+ for (int i = 1; i <= 1000; i++) {
+ rows.add(
+ Arrays.asList(
+ String.format("user_%s", i),
+ i
+ ));
+ }
+ return rows;
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (pg != null) {
+ pg.stop();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
new file mode 100644
index 000000000..79c03996f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+source_table = """ CREATE TABLE source ( `user_id` INT PRIMARY KEY NOT NULL
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `user_id` INT PRIMARY KEY NOT NULL
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `year` YEAR DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+ `binary` BINARY ( 64 ) DEFAULT NULL,
+ `blob` BLOB,
+ `long_varbinary` MEDIUMBLOB,
+ `longblob` LONGBLOB,
+ `tinyblob` TINYBLOB,
+ `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+ `tinyint` TINYINT DEFAULT NULL,
+ `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+ `smallint` SMALLINT DEFAULT NULL,
+ `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+ `mediumint` MEDIUMINT DEFAULT NULL,
+ `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+ `int` INT DEFAULT NULL,
+ `int_unsigned` INT UNSIGNED DEFAULT NULL,
+ `integer` INT DEFAULT NULL,
+ `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+ `bigint` BIGINT DEFAULT NULL,
+ `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+ `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+ `float` FLOAT DEFAULT NULL,
+ `double` DOUBLE DEFAULT NULL,
+ `double_precision` DOUBLE DEFAULT NULL,
+
+ `longtext` LONGTEXT,
+ `mediumtext` MEDIUMTEXT,
+ `text` text,
+ `tinytext` TINYTEXT,
+ `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+ `json` json DEFAULT NULL,
+
+ `date` date DEFAULT NULL,
+ `datetime` datetime DEFAULT NULL,
+ `year` YEAR DEFAULT NULL,
+ `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `year`,
+ `timestamp`
+)
+VALUES
+ (
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ 'a',
+ '{}',
+ '2022-09-22',
+ '2022-09-22 15:07:44',
+ 2022,
+ '2022-09-22 15:07:55'
+ )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+ count( 1 )
+FROM
+ ( SELECT * FROM type_source_table UNION ALL SELECT * FROM
type_sink_table ) a
+GROUP BY
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `year`,
+ `timestamp`
+"""
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
new file mode 100644
index 000000000..06ab2214e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
new file mode 100644
index 000000000..08110f747
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from type_source_table"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = """ INSERT INTO `type_sink_table` (
+ `binary`,
+ `blob`,
+ `long_varbinary`,
+ `longblob`,
+ `tinyblob`,
+ `varbinary`,
+ `tinyint`,
+ `tinyint_unsigned`,
+ `smallint`,
+ `smallint_unsigned`,
+ `mediumint`,
+ `mediumint_unsigned`,
+ `int`,
+ `int_unsigned`,
+ `integer`,
+ `integer_unsigned`,
+ `bigint`,
+ `bigint_unsigned`,
+ `numeric`,
+ `decimal`,
+ `float`,
+ `double`,
+ `double_precision`,
+ `longtext`,
+ `mediumtext`,
+ `text`,
+ `tinytext`,
+ `varchar`,
+ `json`,
+ `date`,
+ `datetime`,
+ `year`,
+ `timestamp`
+ )
+ VALUES
+ (
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?,
+ ?
+ )"""
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
new file mode 100644
index 000000000..53fc85b44
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..a682057e6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ partition_lower_bound=1
+ partition_upper_bound=50
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
new file mode 100644
index 000000000..d3053420c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 1000
+}
+
+source {
+ jdbc{
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql:3306/test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "test"
+
+ max_retries = 0
+ query = "insert into sink(name,age) values(?,?)"
+
+ # Non-root users need to grant XA_RECOVER_ADMIN permission on
is_exactly_once = "true"
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+ max_commit_attempts = 3
+ transaction_timeout_sec = 86400
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
new file mode 100644
index 000000000..0e5642551
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
new file mode 100644
index 000000000..7523ac9f6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..875eeb3e8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 3
+ job.mode = "BATCH"
+}
+
+source{
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select user_id, name , age from source"
+ partition_column= "user_id"
+
+ result_table_name = "jdbc"
+ partition_lower_bound=1
+ partition_upper_bound=50
+ }
+}
+
+transform {
+ sql {
+ sql = "select name,age from jdbc"
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+
+ user = "test"
+ password = "test"
+ connection_check_timeout_sec = 100
+ query = "insert into sink(name,age) values(?,?)"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
new file mode 100644
index 000000000..f632d63e7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ source.parallelism = 1
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 1000
+}
+
+source {
+ jdbc{
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+ query = "select name , age from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://postgresql:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "test"
+ password = "test"
+
+ max_retries = 0
+ query = "insert into sink(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+ max_commit_attempts = 3
+ transaction_timeout_sec = 86400
+ }
+}
\ No newline at end of file