This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new db434adc1 [Feature] [Connector-V2 E2E] Add mysql and postgres e2e test 
and bug fix (#2838)
db434adc1 is described below

commit db434adc157811b87c7cd08c9dd023e9ef7f8b9d
Author: ic4y <[email protected]>
AuthorDate: Sat Sep 24 09:41:26 2022 +0800

    [Feature] [Connector-V2 E2E] Add mysql and postgres e2e test and bug fix 
(#2838)
    
    * add mysql and postgres e2e test
    
    * fix bug
---
 .../internal/dialect/mysql/MySqlTypeMapper.java    |   3 +-
 .../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java |   8 +-
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |   1 +
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |   3 +
 .../jdbc/internal/xa/SemanticXidGeneratorTest.java |  49 +++++
 .../{ => connector-jdbc-it}/pom.xml                |  30 ++-
 .../jdbc/internal/xa/XaGroupOpsImplIT.java         | 126 ++++++++++++
 .../src/test/resources/log4j.properties            |  22 ++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../connector-jdbc-flink-e2e/pom.xml               |  14 +-
 .../seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java   | 218 ++++++++++++++++++++
 .../e2e/flink/v2/jdbc/JdbcPostgresIT.java          | 191 ++++++++++++++++++
 .../test/resources/jdbc/init_sql/mysql_init.conf   | 221 +++++++++++++++++++++
 .../resources/jdbc/jdbc_mysql_source_and_sink.conf |  44 ++++
 .../jdbc/jdbc_mysql_source_and_sink_datatype.conf  | 116 +++++++++++
 .../jdbc/jdbc_mysql_source_and_sink_parallel.conf  |  52 +++++
 ...mysql_source_and_sink_parallel_upper_lower.conf |  54 +++++
 .../jdbc/jdbc_mysql_source_and_sink_xa.conf        |  54 +++++
 .../jdbc/jdbc_postgres_source_and_sink.conf        |  44 ++++
 .../jdbc_postgres_source_and_sink_parallel.conf    |  52 +++++
 ...tgres_source_and_sink_parallel_upper_lower.conf |  54 +++++
 .../jdbc/jdbc_postgres_source_and_sink_xa.conf     |  53 +++++
 .../connector-jdbc-spark-e2e/pom.xml               |  19 ++
 .../seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java   | 218 ++++++++++++++++++++
 .../e2e/spark/v2/jdbc/JdbcPostgresIT.java          | 191 ++++++++++++++++++
 .../test/resources/jdbc/init_sql/mysql_init.conf   | 216 ++++++++++++++++++++
 .../resources/jdbc/jdbc_mysql_source_and_sink.conf |  44 ++++
 .../jdbc/jdbc_mysql_source_and_sink_datatype.conf  | 114 +++++++++++
 .../jdbc/jdbc_mysql_source_and_sink_parallel.conf  |  52 +++++
 ...mysql_source_and_sink_parallel_upper_lower.conf |  54 +++++
 .../jdbc/jdbc_mysql_source_and_sink_xa.conf        |  54 +++++
 .../jdbc/jdbc_postgres_source_and_sink.conf        |  44 ++++
 .../jdbc_postgres_source_and_sink_parallel.conf    |  52 +++++
 ...tgres_source_and_sink_parallel_upper_lower.conf |  54 +++++
 .../jdbc/jdbc_postgres_source_and_sink_xa.conf     |  53 +++++
 35 files changed, 2553 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
index db432f62d..aa8206185 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -103,6 +103,7 @@ public class MySqlTypeMapper implements 
JdbcDialectTypeMapper {
             case MYSQL_MEDIUMINT_UNSIGNED:
             case MYSQL_INT:
             case MYSQL_INTEGER:
+            case MYSQL_YEAR:
                 return BasicType.INT_TYPE;
             case MYSQL_INT_UNSIGNED:
             case MYSQL_INTEGER_UNSIGNED:
@@ -138,8 +139,6 @@ public class MySqlTypeMapper implements 
JdbcDialectTypeMapper {
                         + "the precision will be set to 2147483647.",
                     MYSQL_LONGTEXT);
                 return BasicType.STRING_TYPE;
-
-            case MYSQL_YEAR:
             case MYSQL_DATE:
                 return LocalTimeType.LOCAL_DATE_TYPE;
             case MYSQL_TIME:
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
index ff2012fd7..9ad3bbe93 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class XaGroupOpsImpl
     implements XaGroupOps {
@@ -112,8 +113,11 @@ public class XaGroupOpsImpl
     }
 
     @Override
-    public void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
-        Collection<Xid> recovered = xaFacade.recover();
+    public void recoverAndRollback(JobContext context, SinkWriter.Context 
sinkContext, XidGenerator xidGenerator,
+                                   Xid excludeXid) {
+        Collection<Xid> recovered = xaFacade.recover().stream()
+            .map(x -> new XidImpl(x.getFormatId(), x.getGlobalTransactionId(), 
x.getBranchQualifier())).collect(
+                Collectors.toList());
         recovered.remove(excludeXid);
         if (recovered.isEmpty()) {
             return;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 20461db9f..0b26da681 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -134,6 +134,7 @@ public class JdbcExactlyOnceSinkWriter
     @Override
     public Optional<XidInfo> prepareCommit()
         throws IOException {
+        tryOpen();
         prepareCurrentTx();
         this.currentXid = null;
         beginTx();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index f081ad1c0..86aa5f9ae 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -79,6 +79,7 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
     @Override
     public Optional<XidInfo> prepareCommit()
         throws IOException {
+        tryOpen();
         outputFormat.flush();
         return Optional.empty();
     }
@@ -91,6 +92,8 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
     @Override
     public void close()
         throws IOException {
+        tryOpen();
+        outputFormat.flush();
         outputFormat.close();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
new file mode 100644
index 000000000..906c3bd33
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGeneratorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.transaction.xa.Xid;
+
+class SemanticXidGeneratorTest {
+    private JobContext jobContext;
+    private SemanticXidGenerator xidGenerator;
+
+    @BeforeEach
+    void before() {
+        jobContext = new JobContext();
+        xidGenerator = new SemanticXidGenerator();
+        xidGenerator.open();
+    }
+
+    @Test
+    void testBelongsToSubtask() {
+        DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(1);
+        Xid xid1 = xidGenerator.generateXid(jobContext, dc1, 
System.currentTimeMillis());
+        Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext, 
dc1));
+        Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, jobContext, 
new DefaultSinkWriterContext(2)));
+        Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, new 
JobContext(), dc1));
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
similarity index 68%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
index dc1149995..de81e8ff2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/pom.xml
@@ -17,38 +17,34 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <modules>
-        <module>connector-assert-e2e</module>
-    </modules>
 
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
+    <artifactId>connector-jdbc-it</artifactId>
+
+    <properties>
+        <testcontainers.version>1.17.3</testcontainers.version>
+    </properties>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>connector-jdbc</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
new file mode 100644
index 000000000..dd8b02d3e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.util.stream.Stream;
+
+class XaGroupOpsImplIT {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(XaGroupOpsImplIT.class);
+    private MySQLContainer<?> mc;
+    private XaGroupOps xaGroupOps;
+    private SemanticXidGenerator xidGenerator;
+    private JdbcConnectionOptions jdbcConnectionOptions;
+    private XaFacade xaFacade;
+    private XAResource xaResource;
+
+    @BeforeEach
+    void before() throws Exception {
+        // Non-root users need to grant XA_RECOVER_ADMIN permission
+        mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+            .withUsername("root")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(mc)).join();
+
+        jdbcConnectionOptions = JdbcConnectionOptions.builder()
+            .withUrl(mc.getJdbcUrl())
+            .withUsername(mc.getUsername())
+            .withPassword(mc.getPassword())
+            .withXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource")
+            .build();
+
+        xidGenerator = new SemanticXidGenerator();
+        xidGenerator.open();
+        xaFacade = new XaFacadeImplAutoLoad(jdbcConnectionOptions);
+        xaFacade.open();
+        xaGroupOps = new XaGroupOpsImpl(xaFacade);
+
+        XADataSource xaDataSource = (XADataSource) 
DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
+        xaResource = xaDataSource.getXAConnection().getXAResource();
+
+    }
+
+    @Test
+    void testRecoverAndRollback() throws Exception {
+        JobContext jobContext = new JobContext();
+        SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1);
+        Xid xid1 =
+            xidGenerator.generateXid(jobContext, writerContext1, 
System.currentTimeMillis());
+        Xid xid2 =
+            xidGenerator.generateXid(jobContext, writerContext1, 
System.currentTimeMillis() + 1);
+
+        xaFacade.start(xid1);
+        xaFacade.endAndPrepare(xid1);
+
+        xaFacade.start(xid2);
+        xaFacade.endAndPrepare(xid2);
+
+        Assertions.assertTrue(checkPreparedXid(xid1));
+        Assertions.assertTrue(checkPreparedXid(xid2));
+
+        xaGroupOps.recoverAndRollback(jobContext, writerContext1, 
xidGenerator, xid2);
+
+        Assertions.assertFalse(checkPreparedXid(xid1));
+        Assertions.assertTrue(checkPreparedXid(xid2));
+
+    }
+
+    private boolean checkPreparedXid(Xid xidCrr) throws XAException {
+        Xid[] recover = xaResource.recover(TMSTARTRSCAN);
+        for (int i = 0; i < recover.length; i++) {
+            XidImpl xid = new XidImpl(recover[i].getFormatId(), 
recover[i].getGlobalTransactionId(),
+                recover[i].getBranchQualifier());
+            if (xid.equals(xidCrr)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (mc != null) {
+            mc.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-it/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index dc1149995..0c42e9ba8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -25,6 +25,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>connector-assert-e2e</module>
+        <module>connector-jdbc-it</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index 1d46370b6..3cf6f5d77 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -25,6 +25,10 @@
 
     <artifactId>connector-jdbc-flink-e2e</artifactId>
 
+    <properties>
+        <testcontainers.version>1.17.3</testcontainers.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -59,9 +63,17 @@
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
-            <version>1.17.3</version>
+            <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
new file mode 100644
index 000000000..572e82ad4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcMysqlIT extends FlinkContainer {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcMysqlIT.class);
+    private MySQLContainer<?> mc;
+    private Config config;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws Exception {
+        // Non-root users need to grant XA_RECOVER_ADMIN permission on 
is_exactly_once = "true"
+        mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("mysql")
+            .withUsername("root")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(mc)).join();
+        LOGGER.info("Mysql container started");
+        Class.forName(mc.getDriverClassName());
+        given().ignoreExceptions()
+            .await()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(5, TimeUnit.SECONDS)
+            .untilAsserted(() -> initializeJdbcTable());
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        java.net.URL resource = 
FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+
+        config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
+
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+            "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            
statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Mysql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() {
+        String sql = "insert into source(name, age) values(?,?)";
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+            for (List row : generateTestDataset()) {
+                preparedStatement.setString(1, (String) row.get(0));
+                preparedStatement.setInt(2, (Integer) row.get(1));
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSink() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+        Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws 
Exception {
+        Container.ExecResult execResult =
+            
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+
+        //lower=1 upper=50
+        List<List> limit50 = 
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+        Assertions.assertIterableEquals(limit50, sortedResult);
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkXA() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_xa.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = 
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+            resultSet.next();
+            Assertions.assertEquals(resultSet.getInt(1), 2);
+        }
+    }
+
+    private List<List> queryResult() {
+        List<List> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "select name , age from sink ";
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(
+                    Arrays.asList(
+                        resultSet.getString(1),
+                        resultSet.getInt(2)
+                    )
+                );
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Query result data failed!", e);
+        }
+        return result;
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 1000; i++) {
+            rows.add(
+                Arrays.asList(
+                    String.format("user_%s", i),
+                    i
+                ));
+        }
+        return rows;
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (mc != null) {
+            mc.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
new file mode 100644
index 000000000..c45e0693e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcPostgresIT extends FlinkContainer {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcPostgresIT.class);
+    private PostgreSQLContainer<?> pg;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws Exception {
+        pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("postgresql")
+            .withCommand("postgres -c max_prepared_transactions=100")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(pg)).join();
+        LOGGER.info("Postgres container started");
+        Class.forName(pg.getDriverClassName());
+        given().ignoreExceptions()
+            .await()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(10, TimeUnit.SECONDS)
+            .untilAsserted(() -> initializeJdbcTable());
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            Statement statement = connection.createStatement();
+            String source = "create table source(\n" +
+                "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+                "name char(10),\n" +
+                "age INT\n" +
+                ")";
+            String sink = "create table sink(\n" +
+                "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+                "name char(10),\n" +
+                "age INT\n" +
+                ")";
+            statement.execute(source);
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Mysql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() {
+        String sql = "insert into source(name, age) values(?,?)";
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+            for (List row : generateTestDataset()) {
+                preparedStatement.setString(1, (String) row.get(0));
+                preparedStatement.setInt(2, (Integer) row.get(1));
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSink() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+        Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws 
Exception {
+        Container.ExecResult execResult =
+            
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+
+        //lower=1 upper=50
+        List<List> limit50 = 
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+        Assertions.assertIterableEquals(limit50, sortedResult);
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkXA() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    private List<List> queryResult() {
+        List<List> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "select name , age from sink ";
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(
+                    Arrays.asList(
+                        resultSet.getString(1).replace(" ", ""),
+                        resultSet.getInt(2)
+                    )
+                );
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Query result data failed!", e);
+        }
+        return result;
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 1000; i++) {
+            rows.add(
+                Arrays.asList(
+                    String.format("user_%s", i),
+                    i
+                ));
+        }
+        return rows;
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (pg != null) {
+            pg.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
new file mode 100644
index 000000000..88178d7d1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
@@ -0,0 +1,221 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+source_table = """ CREATE TABLE source ( `user_id` INT PRIMARY KEY NOT NULL 
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `user_id` INT PRIMARY KEY NOT NULL 
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+       `time` time DEFAULT NULL,
+       `year` YEAR DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+       `time` time DEFAULT NULL,
+       `year` YEAR DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `time`,
+       `year`,
+       `timestamp`
+)
+VALUES
+       (
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               'a',
+               'a',
+               'a',
+               'a',
+               'a',
+               '{}',
+               '2022-09-22',
+               '2022-09-22 15:07:44',
+               '15:07:48',
+               2022,
+       '2022-09-22 15:07:55'
+       )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+       count( 1 )
+FROM
+       ( SELECT * FROM type_source_table UNION ALL SELECT * FROM 
type_sink_table ) a
+GROUP BY
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `time`,
+       `year`,
+       `timestamp`
+"""
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
new file mode 100644
index 000000000..4e216040b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
new file mode 100644
index 000000000..840467eea
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select * from type_source_table"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = """ INSERT INTO `type_sink_table` (
+                       `binary`,
+                       `blob`,
+                       `long_varbinary`,
+                       `longblob`,
+                       `tinyblob`,
+                       `varbinary`,
+                       `tinyint`,
+                       `tinyint_unsigned`,
+                       `smallint`,
+                       `smallint_unsigned`,
+                       `mediumint`,
+                       `mediumint_unsigned`,
+                       `int`,
+                       `int_unsigned`,
+                       `integer`,
+                       `integer_unsigned`,
+                       `bigint`,
+                       `bigint_unsigned`,
+                       `numeric`,
+                       `decimal`,
+                       `float`,
+                       `double`,
+                       `double_precision`,
+                       `longtext`,
+                       `mediumtext`,
+                       `text`,
+                       `tinytext`,
+                       `varchar`,
+                       `json`,
+                       `date`,
+                       `datetime`,
+                       `time`,
+                       `year`,
+                       `timestamp`
+                 )
+                 VALUES
+                       (
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                           ?
+                       )"""
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
new file mode 100644
index 000000000..718207133
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..9222dd9f3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+        partition_lower_bound=1
+        partition_upper_bound=50
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
new file mode 100644
index 000000000..7d6bfa960
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 1000
+}
+
+source {
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+       user = "root"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+
+        max_retries = 0
+        query = "insert into sink(name,age) values(?,?)"
+
+       # Non-root users need to grant XA_RECOVER_ADMIN permission on 
is_exactly_once = "true"
+        is_exactly_once = "true"
+
+        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+        max_commit_attempts = 3
+        transaction_timeout_sec = 86400
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
new file mode 100644
index 000000000..20244099e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
new file mode 100644
index 000000000..d60fbca69
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..8f929f70d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+        partition_lower_bound=1
+        partition_upper_bound=50
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+
+        user = "test"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
new file mode 100644
index 000000000..33662ae58
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 1000
+}
+
+source {
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+
+        max_retries = 0
+        query = "insert into sink(name,age) values(?,?)"
+
+        is_exactly_once = "true"
+
+        xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+        max_commit_attempts = 3
+        transaction_timeout_sec = 86400
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index bd0a341fe..93cf5189b 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -25,6 +25,10 @@
 
     <artifactId>connector-jdbc-spark-e2e</artifactId>
 
+    <properties>
+        <testcontainers.version>1.17.3</testcontainers.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -43,6 +47,21 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- db container -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- jdbc drivers -->
         <dependency>
             <groupId>mysql</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
new file mode 100644
index 000000000..a5ffb0180
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.URL;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcMysqlIT extends SparkContainer {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcMysqlIT.class);
+    private MySQLContainer<?> mc;
+    private Config config;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws Exception {
+        // Non-root users need to grant XA_RECOVER_ADMIN permission on 
is_exactly_once = "true"
+        mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("mysql")
+            .withUsername("root")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(mc)).join();
+        LOGGER.info("Mysql container started");
+        Class.forName(mc.getDriverClassName());
+        given().ignoreExceptions()
+            .await()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(5, TimeUnit.SECONDS)
+            .untilAsserted(() -> initializeJdbcTable());
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        URL resource = 
JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+
+        config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
+
+        CheckConfigUtil.checkAllExists(this.config, "source_table", 
"sink_table", "type_source_table",
+            "type_sink_table", "insert_type_source_table_sql", 
"check_type_sink_table_sql");
+
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            
statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Mysql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() {
+        String sql = "insert into source(name, age) values(?,?)";
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+            for (List row : generateTestDataset()) {
+                preparedStatement.setString(1, (String) row.get(0));
+                preparedStatement.setInt(2, (Integer) row.get(1));
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSink() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+        Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws 
Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+
+        //lower=1 upper=50
+        List<List> limit50 = 
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+        Assertions.assertIterableEquals(limit50, sortedResult);
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkXA() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_xa.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = 
statement.executeQuery(config.getString("check_type_sink_table_sql"));
+            resultSet.next();
+            Assertions.assertEquals(resultSet.getInt(1), 2);
+        }
+    }
+
+    private List<List> queryResult() {
+        List<List> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), 
mc.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "select name , age from sink ";
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(
+                    Arrays.asList(
+                        resultSet.getString(1),
+                        resultSet.getInt(2)
+                    )
+                );
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Query result data failed!", e);
+        }
+        return result;
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 1000; i++) {
+            rows.add(
+                Arrays.asList(
+                    String.format("user_%s", i),
+                    i
+                ));
+        }
+        return rows;
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (mc != null) {
+            mc.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
new file mode 100644
index 000000000..cd2e55d6f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JdbcPostgresIT extends SparkContainer {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcPostgresIT.class);
+    private PostgreSQLContainer<?> pg;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws Exception {
+        pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("postgresql")
+            .withCommand("postgres -c max_prepared_transactions=100")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(pg)).join();
+        LOGGER.info("Postgres container started");
+        Class.forName(pg.getDriverClassName());
+        given().ignoreExceptions()
+            .await()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(10, TimeUnit.SECONDS)
+            .untilAsserted(() -> initializeJdbcTable());
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            Statement statement = connection.createStatement();
+            String source = "create table source(\n" +
+                "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+                "name char(10),\n" +
+                "age INT\n" +
+                ")";
+            String sink = "create table sink(\n" +
+                "user_id bigserial NOT NULL PRIMARY KEY,\n" +
+                "name char(10),\n" +
+                "age INT\n" +
+                ")";
+            statement.execute(source);
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Mysql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() {
+        String sql = "insert into source(name, age) values(?,?)";
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+            for (List row : generateTestDataset()) {
+                preparedStatement.setString(1, (String) row.get(0));
+                preparedStatement.setInt(2, (Integer) row.get(1));
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSink() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+        Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws 
Exception {
+        Container.ExecResult execResult =
+            
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        //Sorting is required, because it is read in parallel, so there will 
be out of order
+        List<List> sortedResult = 
queryResult().stream().sorted(Comparator.comparing(list -> (Integer) 
list.get(1)))
+            .collect(Collectors.toList());
+
+        //lower=1 upper=50
+        List<List> limit50 = 
generateTestDataset().stream().limit(50).collect(Collectors.toList());
+        Assertions.assertIterableEquals(limit50, sortedResult);
+    }
+
+    @Test
+    public void testJdbcPostgresSourceAndSinkXA() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Assertions.assertIterableEquals(generateTestDataset(), queryResult());
+    }
+
+    private List<List> queryResult() {
+        List<List> result = new ArrayList<>();
+        try (Connection connection = 
DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), 
pg.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "select name , age from sink ";
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(
+                    Arrays.asList(
+                        resultSet.getString(1).replace(" ", ""),
+                        resultSet.getInt(2)
+                    )
+                );
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Query result data failed!", e);
+        }
+        return result;
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 1000; i++) {
+            rows.add(
+                Arrays.asList(
+                    String.format("user_%s", i),
+                    i
+                ));
+        }
+        return rows;
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (pg != null) {
+            pg.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
new file mode 100644
index 000000000..79c03996f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/mysql_init.conf
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+source_table = """ CREATE TABLE source ( `user_id` INT PRIMARY KEY NOT NULL 
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+sink_table = """ CREATE TABLE sink ( `user_id` INT PRIMARY KEY NOT NULL 
AUTO_INCREMENT, `name` CHAR ( 10 ), `age` INT ) """
+
+type_source_table = """
+CREATE TABLE `type_source_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+       `year` YEAR DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+type_sink_table = """
+CREATE TABLE `type_sink_table` (
+       `binary` BINARY ( 64 ) DEFAULT NULL,
+       `blob` BLOB,
+       `long_varbinary` MEDIUMBLOB,
+       `longblob` LONGBLOB,
+       `tinyblob` TINYBLOB,
+       `varbinary` VARBINARY ( 100 ) DEFAULT NULL,
+
+       `tinyint` TINYINT DEFAULT NULL,
+       `tinyint_unsigned` TINYINT UNSIGNED DEFAULT NULL,
+       `smallint` SMALLINT DEFAULT NULL,
+       `smallint_unsigned` SMALLINT UNSIGNED DEFAULT NULL,
+       `mediumint` MEDIUMINT DEFAULT NULL,
+       `mediumint_unsigned` MEDIUMINT UNSIGNED DEFAULT NULL,
+       `int` INT DEFAULT NULL,
+       `int_unsigned` INT UNSIGNED DEFAULT NULL,
+       `integer` INT DEFAULT NULL,
+       `integer_unsigned` INT UNSIGNED DEFAULT NULL,
+       `bigint` BIGINT DEFAULT NULL,
+       `bigint_unsigned` BIGINT UNSIGNED DEFAULT NULL,
+       `numeric` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `decimal` DECIMAL ( 10, 0 ) DEFAULT NULL,
+       `float` FLOAT DEFAULT NULL,
+       `double` DOUBLE DEFAULT NULL,
+       `double_precision` DOUBLE DEFAULT NULL,
+
+       `longtext` LONGTEXT,
+       `mediumtext` MEDIUMTEXT,
+       `text` text,
+       `tinytext` TINYTEXT,
+       `varchar` VARCHAR ( 100 ) DEFAULT NULL,
+       `json` json DEFAULT NULL,
+
+       `date` date DEFAULT NULL,
+       `datetime` datetime DEFAULT NULL,
+       `year` YEAR DEFAULT NULL,
+    `timestamp` TIMESTAMP NULL DEFAULT NULL
+)
+"""
+
+insert_type_source_table_sql = """
+INSERT INTO `type_source_table` (
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `year`,
+       `timestamp`
+)
+VALUES
+       (
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               NULL,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               1,
+               'a',
+               'a',
+               'a',
+               'a',
+               'a',
+               '{}',
+               '2022-09-22',
+               '2022-09-22 15:07:44',
+               2022,
+       '2022-09-22 15:07:55'
+       )
+"""
+
+check_type_sink_table_sql = """
+SELECT
+       count( 1 )
+FROM
+       ( SELECT * FROM type_source_table UNION ALL SELECT * FROM 
type_sink_table ) a
+GROUP BY
+       `binary`,
+       `blob`,
+       `long_varbinary`,
+       `longblob`,
+       `tinyblob`,
+       `varbinary`,
+       `tinyint`,
+       `tinyint_unsigned`,
+       `smallint`,
+       `smallint_unsigned`,
+       `mediumint`,
+       `mediumint_unsigned`,
+       `int`,
+       `int_unsigned`,
+       `integer`,
+       `integer_unsigned`,
+       `bigint`,
+       `bigint_unsigned`,
+       `numeric`,
+       `decimal`,
+       `float`,
+       `double`,
+       `double_precision`,
+       `longtext`,
+       `mediumtext`,
+       `text`,
+       `tinytext`,
+       `varchar`,
+       `json`,
+       `date`,
+       `datetime`,
+       `year`,
+       `timestamp`
+"""
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
new file mode 100644
index 000000000..06ab2214e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
new file mode 100644
index 000000000..08110f747
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_datatype.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select * from type_source_table"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = """ INSERT INTO `type_sink_table` (
+                       `binary`,
+                       `blob`,
+                       `long_varbinary`,
+                       `longblob`,
+                       `tinyblob`,
+                       `varbinary`,
+                       `tinyint`,
+                       `tinyint_unsigned`,
+                       `smallint`,
+                       `smallint_unsigned`,
+                       `mediumint`,
+                       `mediumint_unsigned`,
+                       `int`,
+                       `int_unsigned`,
+                       `integer`,
+                       `integer_unsigned`,
+                       `bigint`,
+                       `bigint_unsigned`,
+                       `numeric`,
+                       `decimal`,
+                       `float`,
+                       `double`,
+                       `double_precision`,
+                       `longtext`,
+                       `mediumtext`,
+                       `text`,
+                       `tinytext`,
+                       `varchar`,
+                       `json`,
+                       `date`,
+                       `datetime`,
+                       `year`,
+                       `timestamp`
+                 )
+                 VALUES
+                       (
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                               ?,
+                           ?
+                       )"""
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
new file mode 100644
index 000000000..53fc85b44
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..a682057e6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+        partition_lower_bound=1
+        partition_upper_bound=50
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
new file mode 100644
index 000000000..d3053420c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_xa.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 1000
+}
+
+source {
+    jdbc{
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+       user = "root"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:mysql://mysql:3306/test"
+        driver = "com.mysql.cj.jdbc.Driver"
+        user = "root"
+        password = "test"
+
+        max_retries = 0
+        query = "insert into sink(name,age) values(?,?)"
+
+       # Non-root users need to grant XA_RECOVER_ADMIN permission on 
is_exactly_once = "true"
+        is_exactly_once = "true"
+
+        xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+        max_commit_attempts = 3
+        transaction_timeout_sec = 86400
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
new file mode 100644
index 000000000..0e5642551
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
new file mode 100644
index 000000000..7523ac9f6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
new file mode 100644
index 000000000..875eeb3e8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 3
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select user_id,  name , age from source"
+        partition_column= "user_id"
+
+        result_table_name = "jdbc"
+        partition_lower_bound=1
+        partition_upper_bound=50
+    }
+}
+
+transform {
+    sql {
+      sql = "select name,age from jdbc"
+    }
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+
+        user = "test"
+        password = "test"
+        connection_check_timeout_sec = 100
+        query = "insert into sink(name,age) values(?,?)"
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
new file mode 100644
index 000000000..f632d63e7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_xa.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  source.parallelism = 1
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 1000
+}
+
+source {
+    jdbc{
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+        query = "select name , age from source"
+    }
+}
+
+transform {
+}
+
+sink {
+    jdbc {
+        url = "jdbc:postgresql://postgresql:5432/test"
+        driver = "org.postgresql.Driver"
+        user = "test"
+        password = "test"
+
+        max_retries = 0
+        query = "insert into sink(name,age) values(?,?)"
+
+        is_exactly_once = "true"
+
+        xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+        max_commit_attempts = 3
+        transaction_timeout_sec = 86400
+    }
+}
\ No newline at end of file

Reply via email to