This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 463cafe38 [api-draft][connector] Add SeaTunnel jdbc source (#2048)
463cafe38 is described below

commit 463cafe38e35c62318b3b3a11a477a8b1786c452
Author: ic4y <[email protected]>
AuthorDate: Mon Jun 27 13:15:51 2022 +0800

    [api-draft][connector] Add SeaTunnel jdbc source (#2048)
    
    * Add SeaTunnel jdbc sink (#1946)
    
    * Add license head
    
    * fix checkStyle err
---
 seatunnel-connectors/plugin-mapping.properties     |   2 +
 .../seatunnel/jdbc/config/JdbcConfig.java          | 102 ++++++++++
 .../seatunnel/jdbc/config/JdbcSinkOptions.java     |  43 ++++
 .../seatunnel/jdbc/config/JdbcSourceOptions.java   |  77 ++++++++
 .../seatunnel/jdbc/internal/JdbcInputFormat.java   | 220 +++++++++++++++++++++
 .../seatunnel/jdbc/internal/JdbcOutputFormat.java  |  22 +--
 .../jdbc/internal/connection/DataSourceUtils.java  |  20 +-
 .../connection/SimpleJdbcConnectionProvider.java   |   6 +-
 .../converter/AbstractJdbcRowConverter.java        |  92 +++++++++
 .../jdbc/internal/converter/JdbcRowConverter.java  |  40 ++++
 .../jdbc/internal/dialect/JdbcDialect.java         |  52 +++++
 .../jdbc/internal/dialect/JdbcDialectFactory.java  |  40 ++++
 .../jdbc/internal/dialect/JdbcDialectLoader.java   | 102 ++++++++++
 .../internal/dialect/JdbcDialectTypeMapper.java    |  34 ++++
 .../dialect/mysql/MySqlDialectFactory.java         |  38 ++++
 .../internal/dialect/mysql/MySqlTypeMapper.java    | 167 ++++++++++++++++
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  39 ++++
 .../dialect/mysql/MysqlJdbcRowConverter.java       |  38 ++++
 .../internal/options/JdbcConnectionOptions.java    | 200 +++++++++++++++++++
 .../split/JdbcGenericParameterValuesProvider.java  |  39 ++++
 .../JdbcNumericBetweenParametersProvider.java      | 116 +++++++++++
 .../split/JdbcParameterValuesProvider.java         |  31 +++
 .../seatunnel/jdbc/internal/xa/XaFacade.java       |   6 +-
 .../jdbc/internal/xa/XaFacadeImplAutoLoad.java     |  15 +-
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |  13 +-
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  22 +--
 .../jdbc/sink/JdbcSinkAggregatedCommitter.java     |  12 +-
 .../seatunnel/jdbc/sink/JdbcSinkCommitter.java     |  12 +-
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  10 +-
 .../seatunnel/jdbc/source/JdbcSource.java          | 211 ++++++++++++++++++++
 .../seatunnel/jdbc/source/JdbcSourceReader.java    |  97 +++++++++
 .../seatunnel/jdbc/source/JdbcSourceSplit.java     |  36 ++++
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 100 ++++++++++
 .../seatunnel/jdbc/source/PartitionParameter.java  |  32 +++
 .../seatunnel/jdbc/state/JdbcSourceState.java      |  23 +++
 35 files changed, 2040 insertions(+), 69 deletions(-)

diff --git a/seatunnel-connectors/plugin-mapping.properties 
b/seatunnel-connectors/plugin-mapping.properties
index 992c399a0..d06354594 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -92,3 +92,5 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
 seatunnel.source.Http = seatunnel-connector-seatunnel-http
 seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
 seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
+seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
+seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
new file mode 100644
index 000000000..4a705247b
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.Serializable;
+
+public class JdbcConfig implements Serializable {
+
+    public static final String URL = "url";
+
+    public static final String DRIVER = "driver";
+
+    public static final String CONNECTION_CHECK_TIMEOUT_SEC = 
"connection_check_timeout_sec";
+
+    public static final String MAX_RETRIES = "max_retries";
+
+    public static final String USER = "user";
+
+    public static final String PASSWORD = "password";
+
+    public static final String QUERY = "query";
+
+    public static final String PARALLELISM = "parallelism";
+
+
+    public static final String BATCH_SIZE = "batch_size";
+
+    public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+
+
+    public static final String IS_EXACTLY_ONCE = "is_exactly_once";
+
+    public static final String XA_DATA_SOURCE_CLASS_NAME = 
"xa_data_source_class_name";
+
+
+    public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts";
+
+    public static final String TRANSACTION_TIMEOUT_SEC = 
"transaction_timeout_sec";
+
+
+    //source config
+    public static final String PARTITION_COLUMN = "partition_column";
+    public static final String PARTITION_UPPER_BOUND = "partition_upper_bound";
+    public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";
+
+    public static JdbcConnectionOptions buildJdbcConnectionOptions(Config 
config) {
+
+        JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions();
+        jdbcOptions.url = config.getString(JdbcConfig.URL);
+        jdbcOptions.driverName = config.getString(JdbcConfig.DRIVER);
+        if (config.hasPath(JdbcConfig.USER)) {
+            jdbcOptions.username = config.getString(JdbcConfig.USER);
+        }
+        if (config.hasPath(JdbcConfig.PASSWORD)) {
+            jdbcOptions.password = config.getString(JdbcConfig.PASSWORD);
+        }
+        jdbcOptions.query = config.getString(JdbcConfig.QUERY);
+
+        if (config.hasPath(JdbcConfig.MAX_RETRIES)) {
+            jdbcOptions.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES);
+        }
+        if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) {
+            jdbcOptions.connectionCheckTimeoutSeconds = 
config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC);
+        }
+        if (config.hasPath(JdbcConfig.BATCH_SIZE)) {
+            jdbcOptions.batchSize = config.getInt(JdbcConfig.BATCH_SIZE);
+        }
+        if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) {
+            jdbcOptions.batchIntervalMs = 
config.getInt(JdbcConfig.BATCH_INTERVAL_MS);
+        }
+
+        if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) {
+            jdbcOptions.xaDataSourceClassName = 
config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME);
+            if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) {
+                jdbcOptions.maxCommitAttempts = 
config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS);
+            }
+            if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) {
+                jdbcOptions.transactionTimeoutSec = 
config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
+            }
+        }
+        return jdbcOptions;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
new file mode 100644
index 000000000..964c859e5
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class JdbcSinkOptions implements Serializable {
+    private JdbcConnectionOptions jdbcConnectionOptions;
+    private boolean isExactlyOnce;
+
+    public JdbcSinkOptions(Config config) {
+        this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
+        if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE) && 
config.getBoolean(JdbcConfig.IS_EXACTLY_ONCE)) {
+            this.isExactlyOnce = true;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
new file mode 100644
index 000000000..69e3ae454
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+@Data
+@AllArgsConstructor
+public class JdbcSourceOptions implements Serializable {
+    private JdbcConnectionOptions jdbcConnectionOptions;
+    private String partitionColumn;
+    private Long partitionUpperBound;
+    private Long partitionLowerBound;
+
+    private Integer parallelism;
+
+    public JdbcSourceOptions(Config config) {
+        this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
+        if (config.hasPath(JdbcConfig.PARTITION_COLUMN)) {
+            this.partitionColumn = 
config.getString(JdbcConfig.PARTITION_COLUMN);
+        }
+        if (config.hasPath(JdbcConfig.PARTITION_UPPER_BOUND)) {
+            this.partitionUpperBound = 
config.getLong(JdbcConfig.PARTITION_UPPER_BOUND);
+        }
+        if (config.hasPath(JdbcConfig.PARTITION_LOWER_BOUND)) {
+            this.partitionLowerBound = 
config.getLong(JdbcConfig.PARTITION_LOWER_BOUND);
+        }
+        if (config.hasPath(JdbcConfig.PARALLELISM)) {
+            this.parallelism = config.getInt(JdbcConfig.PARALLELISM);
+        }
+    }
+
+    public JdbcConnectionOptions getJdbcConnectionOptions() {
+        return jdbcConnectionOptions;
+    }
+
+    public Optional<String> getPartitionColumn() {
+        return Optional.ofNullable(partitionColumn);
+    }
+
+    public Optional<Long> getPartitionUpperBound() {
+        return Optional.ofNullable(partitionUpperBound);
+    }
+
+    public Optional<Long> getPartitionLowerBound() {
+        return Optional.ofNullable(partitionLowerBound);
+    }
+
+    public Optional<Integer> getParallelism() {
+        return Optional.ofNullable(parallelism);
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
new file mode 100644
index 000000000..1baf17256
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * InputFormat to read data from a database and generate Rows. The InputFormat 
has to be configured
+ * using the supplied InputFormatBuilder. A valid RowTypeInfo must be properly 
configured in the
+ * builder
+ */
+
+public class JdbcInputFormat implements Serializable {
+
+    protected static final long serialVersionUID = 2L;
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcInputFormat.class);
+
+    protected JdbcConnectionProvider connectionProvider;
+    protected JdbcRowConverter jdbcRowConverter;
+    protected String queryTemplate;
+    protected SeaTunnelRowType typeInfo;
+    protected int fetchSize;
+    // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+    protected Boolean autoCommit;
+
+    protected transient PreparedStatement statement;
+    protected transient ResultSet resultSet;
+
+    protected boolean hasNext;
+
+    public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
+                           JdbcRowConverter jdbcRowConverter,
+                           SeaTunnelRowType typeInfo,
+                           String queryTemplate,
+                           int fetchSize,
+                           Boolean autoCommit
+    ) {
+        this.connectionProvider = connectionProvider;
+        this.jdbcRowConverter = jdbcRowConverter;
+        this.typeInfo = typeInfo;
+        this.queryTemplate = queryTemplate;
+        this.fetchSize = fetchSize;
+        this.autoCommit = autoCommit;
+    }
+
+    public void openInputFormat() {
+        // called once per inputFormat (on open)
+        try {
+            Connection dbConn = connectionProvider.getOrEstablishConnection();
+
+            // set autoCommit mode only if it was explicitly configured.
+            // keep connection default otherwise.
+            if (autoCommit != null) {
+                dbConn.setAutoCommit(autoCommit);
+            }
+
+            statement = dbConn.prepareStatement(queryTemplate);
+            if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
+                statement.setFetchSize(fetchSize);
+            }
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+        } catch (ClassNotFoundException cnfe) {
+            throw new IllegalArgumentException(
+                "JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+        }
+    }
+
+    public void closeInputFormat() {
+        // called once per inputFormat (on close)
+        try {
+            if (statement != null) {
+                statement.close();
+            }
+        } catch (SQLException se) {
+            LOG.info("Inputformat Statement couldn't be closed - " + 
se.getMessage());
+        } finally {
+            statement = null;
+        }
+
+        connectionProvider.closeConnection();
+
+    }
+
+    /**
+     * Connects to the source database and executes the query
+     *
+     * @param inputSplit which is ignored if this InputFormat is executed as a 
non-parallel source,
+     *                   a "hook" to the query parameters otherwise (using its 
<i>parameterId</i>)
+     * @throws IOException if there's an error during the execution of the 
query
+     */
+    public void open(JdbcSourceSplit inputSplit) throws IOException {
+        try {
+            Object[] parameterValues = inputSplit.getParameterValues();
+            if (parameterValues != null) {
+                for (int i = 0; i < parameterValues.length; i++) {
+                    Object param = parameterValues[i];
+                    if (param instanceof String) {
+                        statement.setString(i + 1, (String) param);
+                    } else if (param instanceof Long) {
+                        statement.setLong(i + 1, (Long) param);
+                    } else if (param instanceof Integer) {
+                        statement.setInt(i + 1, (Integer) param);
+                    } else if (param instanceof Double) {
+                        statement.setDouble(i + 1, (Double) param);
+                    } else if (param instanceof Boolean) {
+                        statement.setBoolean(i + 1, (Boolean) param);
+                    } else if (param instanceof Float) {
+                        statement.setFloat(i + 1, (Float) param);
+                    } else if (param instanceof BigDecimal) {
+                        statement.setBigDecimal(i + 1, (BigDecimal) param);
+                    } else if (param instanceof Byte) {
+                        statement.setByte(i + 1, (Byte) param);
+                    } else if (param instanceof Short) {
+                        statement.setShort(i + 1, (Short) param);
+                    } else if (param instanceof Date) {
+                        statement.setDate(i + 1, (Date) param);
+                    } else if (param instanceof Time) {
+                        statement.setTime(i + 1, (Time) param);
+                    } else if (param instanceof Timestamp) {
+                        statement.setTimestamp(i + 1, (Timestamp) param);
+                    } else if (param instanceof Array) {
+                        statement.setArray(i + 1, (Array) param);
+                    } else {
+                        // extends with other types if needed
+                        throw new IllegalArgumentException(
+                            "open() failed. Parameter "
+                                + i
+                                + " of type "
+                                + param.getClass()
+                                + " is not handled (yet).");
+                    }
+                }
+            }
+            resultSet = statement.executeQuery();
+            hasNext = resultSet.next();
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+        }
+    }
+
+    /**
+     * Closes all resources used.
+     *
+     * @throws IOException Indicates that a resource could not be closed.
+     */
+    public void close() throws IOException {
+        if (resultSet == null) {
+            return;
+        }
+        try {
+            resultSet.close();
+        } catch (SQLException se) {
+            LOG.info("Inputformat ResultSet couldn't be closed - " + 
se.getMessage());
+        }
+    }
+
+    /**
+     * Checks whether all data has been read.
+     *
+     * @return boolean value indication whether all data has been read.
+     */
+    public boolean reachedEnd() throws IOException {
+        return !hasNext;
+    }
+
+    /**
+     * Convert a row of data to seatunnelRow
+     */
+    public SeaTunnelRow nextRecord() throws IOException {
+        try {
+            if (!hasNext) {
+                return null;
+            }
+            SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, 
resultSet.getMetaData(), typeInfo);
+            // update hasNext after we've read the record
+            hasNext = resultSet.next();
+            return seaTunnelRow;
+        } catch (SQLException se) {
+            throw new IOException("Couldn't read data - " + se.getMessage(), 
se);
+        } catch (NullPointerException npe) {
+            throw new IOException("Couldn't access resultSet", npe);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index f7738140c..afa9ebbba 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -21,7 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -51,7 +51,7 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcOutputFormat.class);
 
-    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private final JdbcConnectionOptions jdbcConnectionOptions;
     private final StatementExecutorFactory<E> statementExecutorFactory;
 
     private transient E jdbcStatementExecutor;
@@ -64,10 +64,10 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
 
     public JdbcOutputFormat(
         JdbcConnectionProvider connectionProvider,
-        JdbcConnectorOptions jdbcConnectorOptions,
+        JdbcConnectionOptions jdbcConnectionOptions,
         StatementExecutorFactory<E> statementExecutorFactory) {
         this.connectionProvider = checkNotNull(connectionProvider);
-        this.jdbcConnectorOptions = checkNotNull(jdbcConnectorOptions);
+        this.jdbcConnectionOptions = checkNotNull(jdbcConnectionOptions);
         this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
     }
 
@@ -85,7 +85,7 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
         }
         jdbcStatementExecutor = 
createAndOpenStatementExecutor(statementExecutorFactory);
 
-        if (jdbcConnectorOptions.getBatchIntervalMs() != 0 && 
jdbcConnectorOptions.getBatchSize() != 1) {
+        if (jdbcConnectionOptions.getBatchIntervalMs() != 0 && 
jdbcConnectionOptions.getBatchSize() != 1) {
             this.scheduler =
                 Executors.newScheduledThreadPool(
                     1, runnable -> {
@@ -109,8 +109,8 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
                             }
                         }
                     },
-                    jdbcConnectorOptions.getBatchIntervalMs(),
-                    jdbcConnectorOptions.getBatchIntervalMs(),
+                    jdbcConnectionOptions.getBatchIntervalMs(),
+                    jdbcConnectionOptions.getBatchIntervalMs(),
                     TimeUnit.MILLISECONDS);
         }
     }
@@ -140,8 +140,8 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
         try {
             addToBatch(record);
             batchCount++;
-            if (jdbcConnectorOptions.getBatchSize() > 0
-                && batchCount >= jdbcConnectorOptions.getBatchSize()) {
+            if (jdbcConnectionOptions.getBatchSize() > 0
+                && batchCount >= jdbcConnectionOptions.getBatchSize()) {
                 flush();
             }
         }
@@ -159,7 +159,7 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
         throws IOException {
         checkFlushException();
         final int sleepMs = 1000;
-        for (int i = 0; i <= jdbcConnectorOptions.getMaxRetries(); i++) {
+        for (int i = 0; i <= jdbcConnectionOptions.getMaxRetries(); i++) {
             try {
                 attemptFlush();
                 batchCount = 0;
@@ -167,7 +167,7 @@ public class JdbcOutputFormat<I, E extends 
JdbcBatchStatementExecutor<I>>
             }
             catch (SQLException e) {
                 LOG.error("JDBC executeBatch error, retry times = {}", i, e);
-                if (i >= jdbcConnectorOptions.getMaxRetries()) {
+                if (i >= jdbcConnectionOptions.getMaxRetries()) {
                     ExceptionUtils.rethrowIOException(e);
                 }
                 try {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
index 1e2e213a7..43ac649a3 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 
 import com.google.common.base.CaseFormat;
 import lombok.NonNull;
@@ -39,21 +39,21 @@ public class DataSourceUtils
 
     private static final String SETTER_PREFIX = "set";
 
-    public static CommonDataSource buildCommonDataSource(@NonNull 
JdbcConnectorOptions jdbcConnectorOptions)
+    public static CommonDataSource buildCommonDataSource(@NonNull 
JdbcConnectionOptions jdbcConnectionOptions)
         throws InvocationTargetException, IllegalAccessException {
-        CommonDataSource dataSource = (CommonDataSource) 
loadDataSource(jdbcConnectorOptions.getXaDataSourceClassName());
-        setProperties(dataSource, 
buildDatabaseAccessConfig(jdbcConnectorOptions));
+        CommonDataSource dataSource = (CommonDataSource) 
loadDataSource(jdbcConnectionOptions.getXaDataSourceClassName());
+        setProperties(dataSource, 
buildDatabaseAccessConfig(jdbcConnectionOptions));
         return dataSource;
     }
 
-    private static Map<String, Object> 
buildDatabaseAccessConfig(JdbcConnectorOptions jdbcConnectorOptions) {
+    private static Map<String, Object> 
buildDatabaseAccessConfig(JdbcConnectionOptions jdbcConnectionOptions) {
         HashMap<String, Object> accessConfig = new HashMap<>();
-        accessConfig.put("url", jdbcConnectorOptions.getUrl());
-        if (jdbcConnectorOptions.getUsername().isPresent()) {
-            accessConfig.put("user", jdbcConnectorOptions.getUsername().get());
+        accessConfig.put("url", jdbcConnectionOptions.getUrl());
+        if (jdbcConnectionOptions.getUsername().isPresent()) {
+            accessConfig.put("user", 
jdbcConnectionOptions.getUsername().get());
         }
-        if (jdbcConnectorOptions.getPassword().isPresent()) {
-            accessConfig.put("password", 
jdbcConnectorOptions.getPassword().get());
+        if (jdbcConnectionOptions.getPassword().isPresent()) {
+            accessConfig.put("password", 
jdbcConnectionOptions.getPassword().get());
         }
 
         return accessConfig;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index 7e69e990e..f6e4d56f3 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 
 import lombok.NonNull;
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class SimpleJdbcConnectionProvider
 
     private static final long serialVersionUID = 1L;
 
-    private final JdbcConnectorOptions jdbcOptions;
+    private final JdbcConnectionOptions jdbcOptions;
 
     private transient Driver loadedDriver;
     private transient Connection connection;
@@ -60,7 +60,7 @@ public class SimpleJdbcConnectionProvider
         DriverManager.getDrivers();
     }
 
-    public SimpleJdbcConnectionProvider(@NonNull JdbcConnectorOptions 
jdbcOptions) {
+    public SimpleJdbcConnectionProvider(@NonNull JdbcConnectionOptions 
jdbcOptions) {
         this.jdbcOptions = jdbcOptions;
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
new file mode 100644
index 000000000..57d3501b0
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for all converters that convert between JDBC object and 
Seatunnel internal object.
+ */
+public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+
+    public abstract String converterName();
+
+    public AbstractJdbcRowConverter() {
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:Indentation")
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, 
SeaTunnelRowType typeInfo) throws SQLException {
+
+        List<Object> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+        for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+            Object seatunnelField;
+            SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+            if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getBoolean(i);
+            } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getByte(i);
+            } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getShort(i);
+            } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getInt(i);
+            } else if (BasicType.BIG_INT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getObject(i);
+            } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getLong(i);
+            } else if (BasicType.BIG_DECIMAL_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getBigDecimal(i);
+            } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getFloat(i);
+            } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getDouble(i);
+            } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getString(i);
+            } else if (BasicType.DATE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getObject(i);
+            } else if 
(LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getTime(i).toLocalTime();
+            } else if 
(LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getDate(i).toLocalDate();
+            } else if 
(LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getTimestamp(i).toLocalDateTime();
+            } else if 
(PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE.equals(seaTunnelDataType)) {
+                seatunnelField = rs.getBytes(i);
+            } else {
+                throw new IllegalStateException("Unexpected value: " + 
seaTunnelDataType);
+            }
+
+            fields.add(seatunnelField);
+        }
+
+        return new SeaTunnelRow(fields.toArray());
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
new file mode 100644
index 000000000..b50d149b6
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/**
+ * Converter that is responsible to convert between JDBC object and Seatunnel 
data
+ * structure {@link SeaTunnelRow}.
+ */
+public interface JdbcRowConverter extends Serializable {
+
+    /**
+     * Convert data retrieved from {@link ResultSet} to internal {@link 
SeaTunnelRow}.
+     *
+     * @param rs ResultSet from JDBC
+     */
+    SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, 
SeaTunnelRowType typeInfo) throws SQLException;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
new file mode 100644
index 000000000..155d8d4c2
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+
+import java.io.Serializable;
+
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system. 
Dialects should be immutable
+ * and stateless.
+ */
+
+public interface JdbcDialect extends Serializable {
+
+    /**
+     * Get the name of jdbc dialect.
+     *
+     * @return the dialect name.
+     */
+    String dialectName();
+
+    /**
+     * Get converter that convert jdbc object to seatunnel internal object.
+     *
+     * @return a row converter for the database
+     */
+    JdbcRowConverter getRowConverter();
+
+
+    /**
+     * get jdbc meta-information type to seatunnel data type mapper.
+     * @return a type mapper for the database
+     */
+    JdbcDialectTypeMapper getJdbcDialectTypeMapper();
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
new file mode 100644
index 000000000..eabb406ae
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+/**
+ * A factory to create a specific {@link JdbcDialect}
+ * @see JdbcDialect
+ */
+
+public interface JdbcDialectFactory {
+
+    /**
+     * Retrieves whether the dialect thinks that it can open a connection to 
the given URL.
+     * Typically, dialects will return <code>true</code> if they understand 
the sub-protocol
+     * specified in the URL and <code>false</code> if they do not.
+     *
+     * @param url the URL of the database
+     * @return <code>true</code> if this dialect understands the given URL; 
<code>false</code>
+     *     otherwise.
+     */
+    boolean acceptsURL(String url);
+
+    /** @return Creates a new instance of the {@link JdbcDialect}. */
+    JdbcDialect create();
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
new file mode 100644
index 000000000..cd8efa909
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/** Utility for working with {@link JdbcDialect}. */
+public final class JdbcDialectLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialectLoader.class);
+
+    private JdbcDialectLoader() {}
+
+    /**
+     * Loads the unique JDBC Dialect that can handle the given database url.
+     *
+     * @param url A database URL.
+     * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
+     *     unambiguously process the given database URL.
+     * @return The loaded dialect.
+     */
+    public static JdbcDialect load(String url) {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
+
+        if (foundFactories.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Could not find any jdbc dialect factories that 
implement '%s' in the classpath.",
+                            JdbcDialectFactory.class.getName()));
+        }
+
+        final List<JdbcDialectFactory> matchingFactories =
+                foundFactories.stream().filter(f -> 
f.acceptsURL(url)).collect(Collectors.toList());
+
+        if (matchingFactories.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Could not find any jdbc dialect factory that can 
handle url '%s' that implements '%s' in the classpath.\n\n"
+                                    + "Available factories are:\n\n"
+                                    + "%s",
+                            url,
+                            JdbcDialectFactory.class.getName(),
+                            foundFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .distinct()
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+        if (matchingFactories.size() > 1) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Multiple jdbc dialect factories can handle url 
'%s' that implement '%s' found in the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            url,
+                            JdbcDialectFactory.class.getName(),
+                            matchingFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return matchingFactories.get(0).create();
+    }
+
+    private static List<JdbcDialectFactory> discoverFactories(ClassLoader 
classLoader) {
+        try {
+            final List<JdbcDialectFactory> result = new LinkedList<>();
+            ServiceLoader.load(JdbcDialectFactory.class, classLoader)
+                    .iterator()
+                    .forEachRemaining(result::add);
+            return result;
+        } catch (ServiceConfigurationError e) {
+            LOG.error("Could not load service provider for jdbc dialects 
factory.", e);
+            throw new RuntimeException(
+                    "Could not load service provider for jdbc dialects 
factory.", e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
new file mode 100644
index 000000000..acf296801
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/** Separate the jdbc meta-information type to SeaTunnelDataType into the 
interface. */
+public interface JdbcDialectTypeMapper extends Serializable {
+
+    /**
+     * Convert ResultSetMetaData to Seatunnel data type {@link 
SeaTunnelDataType}.
+     */
+    SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
+        throws SQLException;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
new file mode 100644
index 000000000..56a7f8db6
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link MysqlDialect}. */
+
+@AutoService(JdbcDialectFactory.class)
+public class MySqlDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:mysql:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new MysqlDialect();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
new file mode 100644
index 000000000..3b853a2c3
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class MySqlTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDialect.class);
+
+    // ============================data types=====================
+
+    private static final String MYSQL_UNKNOWN = "UNKNOWN";
+    private static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String MYSQL_TINYINT = "TINYINT";
+    private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String MYSQL_SMALLINT = "SMALLINT";
+    private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT 
UNSIGNED";
+    private static final String MYSQL_INT = "INT";
+    private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String MYSQL_INTEGER = "INTEGER";
+    private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String MYSQL_BIGINT = "BIGINT";
+    private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String MYSQL_DECIMAL = "DECIMAL";
+    private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_FLOAT = "FLOAT";
+    private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String MYSQL_DOUBLE = "DOUBLE";
+    private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String MYSQL_CHAR = "CHAR";
+    private static final String MYSQL_VARCHAR = "VARCHAR";
+    private static final String MYSQL_TINYTEXT = "TINYTEXT";
+    private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String MYSQL_TEXT = "TEXT";
+    private static final String MYSQL_LONGTEXT = "LONGTEXT";
+    private static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String MYSQL_DATE = "DATE";
+    private static final String MYSQL_DATETIME = "DATETIME";
+    private static final String MYSQL_TIME = "TIME";
+    private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    private static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String MYSQL_TINYBLOB = "TINYBLOB";
+    private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String MYSQL_BLOB = "BLOB";
+    private static final String MYSQL_LONGBLOB = "LONGBLOB";
+    private static final String MYSQL_BINARY = "BINARY";
+    private static final String MYSQL_VARBINARY = "VARBINARY";
+    private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case MYSQL_TINYINT:
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return BasicType.INT_TYPE;
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return BasicType.LONG_TYPE;
+            case MYSQL_BIGINT_UNSIGNED:
+                return BasicType.BIG_INT_TYPE;
+            case MYSQL_DECIMAL:
+            case MYSQL_DECIMAL_UNSIGNED:
+                return BasicType.BIG_DECIMAL_TYPE;
+            case MYSQL_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", 
MYSQL_FLOAT_UNSIGNED);
+                return BasicType.FLOAT_TYPE;
+            case MYSQL_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", 
MYSQL_DOUBLE_UNSIGNED);
+                return BasicType.DOUBLE_TYPE;
+            case MYSQL_CHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_VARCHAR:
+            case MYSQL_JSON:
+                return BasicType.STRING_TYPE;
+            case MYSQL_LONGTEXT:
+                LOG.warn(
+                    "Type '{}' has a maximum precision of 536870911 in MySQL. "
+                        + "Due to limitations in the seatunnel type system, "
+                        + "the precision will be set to 2147483647.",
+                    MYSQL_LONGTEXT);
+                return BasicType.STRING_TYPE;
+
+            case MYSQL_YEAR:
+            case MYSQL_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case MYSQL_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case MYSQL_DATETIME:
+            case MYSQL_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            //Doesn't support binary yet
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+
+            case MYSQL_GEOMETRY:
+            case MYSQL_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support MySQL type '%s' on column '%s'  yet.",
+                        mysqlType, jdbcColumnName));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
new file mode 100644
index 000000000..3e4d77158
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class MysqlDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "MySQL";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new MysqlJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new MySqlTypeMapper();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
new file mode 100644
index 000000000..9e3b05c7e
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class MysqlJdbcRowConverter extends AbstractJdbcRowConverter {
+    @Override
+    public String converterName() {
+        return "MySql";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, 
SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
new file mode 100644
index 000000000..19865fed7
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+public class JdbcConnectionOptions
+    implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC = 30;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_BATCH_SIZE = 300;
+    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+    private static final int DEFAULT_TRANSACTION_TIMEOUT_SEC = -1;
+
+    public String url;
+    public String driverName;
+    public int connectionCheckTimeoutSeconds = 
DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+    public int maxRetries = DEFAULT_MAX_RETRIES;
+    public String username;
+    public String password;
+    public String query;
+
+    public int batchSize = DEFAULT_BATCH_SIZE;
+    public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+    public String xaDataSourceClassName;
+
+    public int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+
+    public int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+    public JdbcConnectionOptions() {
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public String getDriverName() {
+        return driverName;
+    }
+
+    public int getConnectionCheckTimeoutSeconds() {
+        return connectionCheckTimeoutSeconds;
+    }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public Optional<String> getUsername() {
+        return Optional.ofNullable(username);
+    }
+
+    public Optional<String> getPassword() {
+        return Optional.ofNullable(password);
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getBatchIntervalMs() {
+        return batchIntervalMs;
+    }
+
+    public String getXaDataSourceClassName() {
+        return xaDataSourceClassName;
+    }
+
+    public int getMaxCommitAttempts() {
+        return maxCommitAttempts;
+    }
+
+    public Optional<Integer> getTransactionTimeoutSec() {
+        return transactionTimeoutSec < 0 ? Optional.empty() : 
Optional.of(transactionTimeoutSec);
+    }
+
+    public static JdbcConnectionOptionsBuilder builder() {
+        return new JdbcConnectionOptionsBuilder();
+    }
+
+    public static final class JdbcConnectionOptionsBuilder {
+        private String url;
+        private String driverName;
+        private int connectionCheckTimeoutSeconds = 
DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+        private int maxRetries = DEFAULT_MAX_RETRIES;
+        private String username;
+        private String password;
+        private String query;
+        private int batchSize = DEFAULT_BATCH_SIZE;
+        private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+        private String xaDataSourceClassName;
+        private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+        private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+        private JdbcConnectionOptionsBuilder() {
+        }
+
+        public JdbcConnectionOptionsBuilder withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
+            this.driverName = driverName;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder 
withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
+            this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withMaxRetries(int maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withUsername(String username) {
+            this.username = username;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withQuery(String query) {
+            this.query = query;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withBatchIntervalMs(int 
batchIntervalMs) {
+            this.batchIntervalMs = batchIntervalMs;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withXaDataSourceClassName(String 
xaDataSourceClassName) {
+            this.xaDataSourceClassName = xaDataSourceClassName;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withMaxCommitAttempts(int 
maxCommitAttempts) {
+            this.maxCommitAttempts = maxCommitAttempts;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withTransactionTimeoutSec(int 
transactionTimeoutSec) {
+            this.transactionTimeoutSec = transactionTimeoutSec;
+            return this;
+        }
+
+        public JdbcConnectionOptions build() {
+            JdbcConnectionOptions jdbcConnectionOptions = new 
JdbcConnectionOptions();
+            jdbcConnectionOptions.batchSize = this.batchSize;
+            jdbcConnectionOptions.batchIntervalMs = this.batchIntervalMs;
+            jdbcConnectionOptions.driverName = this.driverName;
+            jdbcConnectionOptions.maxRetries = this.maxRetries;
+            jdbcConnectionOptions.password = this.password;
+            jdbcConnectionOptions.connectionCheckTimeoutSeconds = 
this.connectionCheckTimeoutSeconds;
+            jdbcConnectionOptions.query = this.query;
+            jdbcConnectionOptions.url = this.url;
+            jdbcConnectionOptions.username = this.username;
+            jdbcConnectionOptions.transactionTimeoutSec = 
this.transactionTimeoutSec;
+            jdbcConnectionOptions.maxCommitAttempts = this.maxCommitAttempts;
+            jdbcConnectionOptions.xaDataSourceClassName = 
this.xaDataSourceClassName;
+            return jdbcConnectionOptions;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
new file mode 100644
index 000000000..ca3a9d68d
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import java.io.Serializable;
+
+/**
+ * This splits generator actually does nothing but wrapping the query 
parameters computed by the
+ * user before creating the {@link 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource} instance.
+ */
+public class JdbcGenericParameterValuesProvider implements 
JdbcParameterValuesProvider {
+
+    private final Serializable[][] parameters;
+
+    public JdbcGenericParameterValuesProvider(Serializable[][] parameters) {
+        this.parameters = parameters;
+    }
+
+    @Override
+    public Serializable[][] getParameterValues() {
+        // do nothing...precomputed externally
+        return parameters;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
new file mode 100644
index 000000000..385bc0231
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+
+/**
+ * This query parameters generator is an helper class to parameterize from/to 
queries on a numeric
+ * column. The generated array of from/to values will be equally sized to 
fetchSize (apart from the
+ * last one), ranging from minVal up to maxVal.
+ *
+ * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK 
<CODE>id</CODE>, using a
+ * query like:
+ *
+ * <PRE>
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * <p>You can take advantage of this class to automatically generate the 
parameters of the BETWEEN
+ * clause, based on the passed constructor parameters.
+ */
+public class JdbcNumericBetweenParametersProvider implements 
JdbcParameterValuesProvider {
+
+    private final long minVal;
+    private final long maxVal;
+
+    private long batchSize;
+    private int batchNum;
+
+    /**
+     * NumericBetweenParametersProviderJdbc constructor.
+     *
+     * @param minVal the lower bound of the produced "from" values
+     * @param maxVal the upper bound of the produced "to" values
+     */
+    public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
+        checkArgument(minVal <= maxVal, "minVal must not be larger than 
maxVal");
+        this.minVal = minVal;
+        this.maxVal = maxVal;
+    }
+
+    /**
+     * NumericBetweenParametersProviderJdbc constructor.
+     *
+     * @param fetchSize the max distance between the produced from/to pairs
+     * @param minVal the lower bound of the produced "from" values
+     * @param maxVal the upper bound of the produced "to" values
+     */
+    public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, 
long maxVal) {
+        checkArgument(minVal <= maxVal, "minVal must not be larger than 
maxVal");
+        this.minVal = minVal;
+        this.maxVal = maxVal;
+        ofBatchSize(fetchSize);
+    }
+
+    public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
+        checkArgument(batchSize > 0, "Batch size must be positive");
+
+        long maxElemCount = (maxVal - minVal) + 1;
+        if (batchSize > maxElemCount) {
+            batchSize = maxElemCount;
+        }
+        this.batchSize = batchSize;
+        this.batchNum = new Double(Math.ceil((double) maxElemCount / 
batchSize)).intValue();
+        return this;
+    }
+
+    public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
+        checkArgument(batchNum > 0, "Batch number must be positive");
+
+        long maxElemCount = (maxVal - minVal) + 1;
+        if (batchNum > maxElemCount) {
+            batchNum = (int) maxElemCount;
+        }
+        this.batchNum = batchNum;
+        this.batchSize = new Double(Math.ceil((double) maxElemCount / 
batchNum)).longValue();
+        return this;
+    }
+
+    @Override
+    public Serializable[][] getParameterValues() {
+        checkState(
+                batchSize > 0,
+                "Batch size and batch number must be positive. Have you called 
`ofBatchSize` or `ofBatchNum`?");
+
+        long maxElemCount = (maxVal - minVal) + 1;
+        long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+        Serializable[][] parameters = new Serializable[batchNum][2];
+        long start = minVal;
+        for (int i = 0; i < batchNum; i++) {
+            long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+            parameters[i] = new Long[] {start, end};
+            start = end + 1;
+        }
+        return parameters;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
new file mode 100644
index 000000000..7b90bc8be
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import java.io.Serializable;
+
+/**
+ * This interface is used by the {@link 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource} to compute 
the list of parallel query to
+ * run (i.e. splits). Each query will be parameterized using a row of the 
matrix provided by each
+ * {@link JdbcParameterValuesProvider} implementation.
+ */
+public interface JdbcParameterValuesProvider {
+
+    /** Returns the necessary parameters array to use for query in parallel a 
table. */
+    Serializable[][] getParameterValues();
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
index 2f8b78bee..6b323bc73 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
@@ -46,8 +46,8 @@ public interface XaFacade
         extends JdbcConnectionProvider, Serializable, AutoCloseable {
 
     static XaFacade fromJdbcConnectionOptions(
-            JdbcConnectorOptions jdbcConnectorOptions) {
-        return  new XaFacadeImplAutoLoad(jdbcConnectorOptions);
+            JdbcConnectionOptions jdbcConnectionOptions) {
+        return  new XaFacadeImplAutoLoad(jdbcConnectionOptions);
     }
 
     void open() throws Exception;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
index d8c969ce7..32c64f44a 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
@@ -35,7 +35,7 @@ import static java.util.Optional.empty;
 import static java.util.Optional.of;
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable;
 
 import org.slf4j.Logger;
@@ -74,14 +74,13 @@ public class XaFacadeImplAutoLoad
         new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, 
XA_HEURMIX));
     private static final int MAX_RECOVER_CALLS = 100;
 
-    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private final JdbcConnectionOptions jdbcConnectionOptions;
     private transient XAResource xaResource;
     private transient Connection connection;
     private transient XAConnection xaConnection;
 
-    XaFacadeImplAutoLoad(JdbcConnectorOptions jdbcConnectorOptions) {
-        checkState(jdbcConnectorOptions.isExactlyOnce(), "is_exactly_once 
config error");
-        this.jdbcConnectorOptions = jdbcConnectorOptions;
+    XaFacadeImplAutoLoad(JdbcConnectionOptions jdbcConnectionOptions) {
+        this.jdbcConnectionOptions = jdbcConnectionOptions;
     }
 
     @Override
@@ -89,16 +88,16 @@ public class XaFacadeImplAutoLoad
         checkState(!isOpen(), "already connected");
         XADataSource ds;
         try {
-            ds = (XADataSource) 
DataSourceUtils.buildCommonDataSource(jdbcConnectorOptions);
+            ds = (XADataSource) 
DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
         }
         catch (Exception e) {
             throw new SQLException(e);
         }
         xaConnection = ds.getXAConnection();
         xaResource = xaConnection.getXAResource();
-        if (jdbcConnectorOptions.getTransactionTimeoutSec().isPresent()) {
+        if (jdbcConnectionOptions.getTransactionTimeoutSec().isPresent()) {
             try {
-                
xaResource.setTransactionTimeout(jdbcConnectorOptions.getTransactionTimeoutSec().get());
+                
xaResource.setTransactionTimeout(jdbcConnectionOptions.getTransactionTimeoutSec().get());
             }
             catch (XAException e) {
                 throw new SQLException(e);
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 849ca50c8..b4527a9ff 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -23,11 +23,11 @@ import static 
com.google.common.base.Preconditions.checkState;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
@@ -74,10 +74,10 @@ public class JdbcExactlyOnceSinkWriter
         SinkWriter.Context sinkcontext,
         SeaTunnelContext context,
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
-        JdbcConnectorOptions jdbcConnectorOptions,
+        JdbcSinkOptions jdbcSinkOptions,
         List<JdbcSinkState> states) {
         checkArgument(
-            jdbcConnectorOptions.getMaxRetries() == 0,
+            jdbcSinkOptions.getJdbcConnectionOptions().getMaxRetries() == 0,
             "JDBC XA sink requires maxRetries equal to 0, otherwise it could "
                 + "cause duplicates.");
 
@@ -85,13 +85,14 @@ public class JdbcExactlyOnceSinkWriter
         this.sinkcontext = sinkcontext;
         this.recoverStates = states;
         this.xidGenerator = XidGenerator.semanticXidGenerator();
+        checkState(jdbcSinkOptions.isExactlyOnce(), "is_exactly_once config 
error");
         this.xaFacade = XaFacade.fromJdbcConnectionOptions(
-            jdbcConnectorOptions);
+            jdbcSinkOptions.getJdbcConnectionOptions());
 
         this.outputFormat = new JdbcOutputFormat<>(
             xaFacade,
-            jdbcConnectorOptions,
-            () -> new 
SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), 
statementBuilder));
+            jdbcSinkOptions.getJdbcConnectionOptions(),
+            () -> new 
SimpleBatchStatementExecutor<>(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(),
 statementBuilder));
 
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
     }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index a225f38f8..219b00c4d 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -27,8 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -53,18 +53,18 @@ public class JdbcSink
 
     private SeaTunnelContext seaTunnelContext;
 
-    private JdbcConnectorOptions jdbcConnectorOptions;
+    private JdbcSinkOptions jdbcSinkOptions;
 
     @Override
     public String getPluginName() {
-        return "jdbc";
+        return "Jdbc";
     }
 
     @Override
     public void prepare(Config pluginConfig)
         throws PrepareFailException {
         this.pluginConfig = pluginConfig;
-        this.jdbcConnectorOptions = new 
JdbcConnectorOptions(this.pluginConfig);
+        this.jdbcSinkOptions = new JdbcSinkOptions(this.pluginConfig);
     }
 
     @Override
@@ -73,19 +73,19 @@ public class JdbcSink
         SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> sinkWriter;
         // TODO SeatunnelTyoeInfo is not good enough to get typesArray
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> 
JdbcUtils.setRecordToStatement(st, null, row);
-        if (jdbcConnectorOptions.isExactlyOnce()) {
+        if (jdbcSinkOptions.isExactlyOnce()) {
             sinkWriter = new JdbcExactlyOnceSinkWriter(
                 context,
                 seaTunnelContext,
                 statementBuilder,
-                jdbcConnectorOptions,
+                jdbcSinkOptions,
                 new ArrayList<>()
             );
         } else {
             sinkWriter = new JdbcSinkWriter(
                 context,
                 statementBuilder,
-                jdbcConnectorOptions);
+                jdbcSinkOptions);
         }
 
         return sinkWriter;
@@ -94,13 +94,13 @@ public class JdbcSink
     @Override
     public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> 
restoreWriter(SinkWriter.Context context, List<JdbcSinkState> states)
         throws IOException {
-        if (jdbcConnectorOptions.isExactlyOnce()) {
+        if (jdbcSinkOptions.isExactlyOnce()) {
             JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> 
JdbcUtils.setRecordToStatement(st, null, row);
             return new JdbcExactlyOnceSinkWriter(
                 context,
                 seaTunnelContext,
                 statementBuilder,
-                jdbcConnectorOptions,
+                jdbcSinkOptions,
                 states
             );
         }
@@ -110,8 +110,8 @@ public class JdbcSink
     @Override
     public Optional<SinkAggregatedCommitter<XidInfo, 
JdbcAggregatedCommitInfo>> createAggregatedCommitter()
         throws IOException {
-        if (jdbcConnectorOptions.isExactlyOnce()) {
-            return Optional.of(new 
JdbcSinkAggregatedCommitter(jdbcConnectorOptions));
+        if (jdbcSinkOptions.isExactlyOnce()) {
+            return Optional.of(new 
JdbcSinkAggregatedCommitter(jdbcSinkOptions));
         }
         return Optional.empty();
     }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
index e5c9f308b..f0ee2671e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
@@ -36,15 +36,15 @@ public class JdbcSinkAggregatedCommitter
 
     private final XaFacade xaFacade;
     private final XaGroupOps xaGroupOps;
-    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private final JdbcSinkOptions jdbcSinkOptions;
 
     public JdbcSinkAggregatedCommitter(
-        JdbcConnectorOptions jdbcConnectorOptions
+        JdbcSinkOptions jdbcSinkOptions
     ) {
         this.xaFacade = XaFacade.fromJdbcConnectionOptions(
-            jdbcConnectorOptions);
+            jdbcSinkOptions.getJdbcConnectionOptions());
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
-        this.jdbcConnectorOptions = jdbcConnectorOptions;
+        this.jdbcSinkOptions = jdbcSinkOptions;
     }
 
     private void tryOpen() throws IOException {
@@ -61,7 +61,7 @@ public class JdbcSinkAggregatedCommitter
     public List<JdbcAggregatedCommitInfo> 
commit(List<JdbcAggregatedCommitInfo> aggregatedCommitInfos) throws IOException 
{
         tryOpen();
         return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> {
-            GroupXaOperationResult<XidInfo> result = 
xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false, 
jdbcConnectorOptions.getMaxCommitAttempts());
+            GroupXaOperationResult<XidInfo> result = 
xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false, 
jdbcSinkOptions.getJdbcConnectionOptions().getMaxCommitAttempts());
             return new JdbcAggregatedCommitInfo(result.getForRetry());
         }).filter(ainfo -> 
!ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList());
     }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
index 626f379a5..f290eec86 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
 import org.apache.seatunnel.api.sink.SinkCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
@@ -32,15 +32,15 @@ public class JdbcSinkCommitter
     implements SinkCommitter<XidInfo> {
     private final XaFacade xaFacade;
     private final XaGroupOps xaGroupOps;
-    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private final JdbcConnectionOptions jdbcConnectionOptions;
 
     public JdbcSinkCommitter(
-        JdbcConnectorOptions jdbcConnectorOptions
+        JdbcConnectionOptions jdbcConnectionOptions
     )
         throws IOException {
-        this.jdbcConnectorOptions = jdbcConnectorOptions;
+        this.jdbcConnectionOptions = jdbcConnectionOptions;
         this.xaFacade = XaFacade.fromJdbcConnectionOptions(
-            jdbcConnectorOptions);
+            jdbcConnectionOptions);
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
         try {
             xaFacade.open();
@@ -53,7 +53,7 @@ public class JdbcSinkCommitter
     @Override
     public List<XidInfo> commit(List<XidInfo> committables) {
         return xaGroupOps
-            .commit(committables, false, 
jdbcConnectorOptions.getMaxCommitAttempts())
+            .commit(committables, false, 
jdbcConnectionOptions.getMaxCommitAttempts())
             .getForRetry();
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 548f10735..f081ad1c0 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -45,15 +45,15 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
     public JdbcSinkWriter(
         SinkWriter.Context context,
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
-        JdbcConnectorOptions jdbcConnectorOptions) {
+        JdbcSinkOptions jdbcSinkOptions) {
 
-        JdbcConnectionProvider connectionProvider = new 
SimpleJdbcConnectionProvider(jdbcConnectorOptions);
+        JdbcConnectionProvider connectionProvider = new 
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
 
         this.context = context;
         this.outputFormat = new JdbcOutputFormat<>(
             connectionProvider,
-            jdbcConnectorOptions,
-            () -> new 
SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), 
statementBuilder));
+            jdbcSinkOptions.getJdbcConnectionOptions(),
+            () -> new 
SimpleBatchStatementExecutor<>(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(),
 statementBuilder));
     }
 
     private void tryOpen() throws IOException {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
new file mode 100644
index 000000000..41d07e92e
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+@AutoService(SeaTunnelSource.class)
+public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, 
JdbcSourceSplit, JdbcSourceState> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+
+    private SeaTunnelContext seaTunnelContext;
+    private JdbcSourceOptions jdbcSourceOptions;
+    private SeaTunnelRowType typeInfo;
+
+    private JdbcDialect jdbcDialect;
+    private JdbcInputFormat inputFormat;
+    private PartitionParameter partitionParameter;
+    private JdbcConnectionProvider jdbcConnectionProvider;
+
+    private String query;
+
+    @Override
+    public String getPluginName() {
+        return "Jdbc";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        jdbcSourceOptions = new JdbcSourceOptions(pluginConfig);
+        jdbcConnectionProvider = new 
SimpleJdbcConnectionProvider(jdbcSourceOptions.getJdbcConnectionOptions());
+        query = jdbcSourceOptions.getJdbcConnectionOptions().query;
+        jdbcDialect = 
JdbcDialectLoader.load(jdbcSourceOptions.getJdbcConnectionOptions().getUrl());
+        try {
+            typeInfo = 
initTableField(jdbcConnectionProvider.getOrEstablishConnection());
+            partitionParameter = 
initPartitionParameterAndExtendSql(jdbcConnectionProvider.getOrEstablishConnection());
+        } catch (Exception e) {
+            throw new PrepareFailException("jdbc", PluginType.SOURCE, 
e.toString());
+        }
+
+        inputFormat = new JdbcInputFormat(
+            jdbcConnectionProvider,
+            jdbcDialect.getRowConverter(),
+            typeInfo,
+            query,
+            0,
+            true
+        );
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        Connection conn;
+        SeaTunnelRowType seaTunnelDataType = null;
+        try {
+            conn = jdbcConnectionProvider.getOrEstablishConnection();
+            seaTunnelDataType = initTableField(conn);
+        } catch (Exception e) {
+            LOG.warn("get row type info exception", e);
+        }
+        this.typeInfo = seaTunnelDataType;
+        return seaTunnelDataType;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, JdbcSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
+        return new JdbcSourceReader(inputFormat, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> 
createEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext) throws Exception {
+        return new JdbcSourceSplitEnumerator(enumeratorContext, 
jdbcSourceOptions, partitionParameter);
+    }
+
+    @Override
+    public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceState checkpointState) throws Exception {
+        return new JdbcSourceSplitEnumerator(enumeratorContext, 
jdbcSourceOptions, partitionParameter);
+    }
+
+    @Override
+    public Serializer<JdbcSourceState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    private SeaTunnelRowType initTableField(Connection conn) {
+        JdbcDialectTypeMapper jdbcDialectTypeMapper = 
jdbcDialect.getJdbcDialectTypeMapper();
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+            PreparedStatement ps = 
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                fieldNames.add(resultSetMetaData.getColumnName(i));
+                
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
+            }
+        } catch (Exception e) {
+            LOG.warn("get row type info exception", e);
+        }
+        return new SeaTunnelRowType(fieldNames.toArray(new 
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+    }
+
+    private PartitionParameter initPartitionParameter(String columnName, 
Connection connection) throws SQLException {
+        long max = Long.MAX_VALUE;
+        long min = Long.MIN_VALUE;
+        if (jdbcSourceOptions.getPartitionLowerBound().isPresent() && 
jdbcSourceOptions.getPartitionUpperBound().isPresent()) {
+            max = jdbcSourceOptions.getPartitionUpperBound().get();
+            min = jdbcSourceOptions.getPartitionLowerBound().get();
+            return new PartitionParameter(columnName, min, max);
+        }
+        try (ResultSet rs = 
connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s) 
" +
+            "FROM (%s) tt", columnName, columnName, query))) {
+            if (rs.next()) {
+                max = jdbcSourceOptions.getPartitionUpperBound().isPresent() ? 
jdbcSourceOptions.getPartitionUpperBound().get() :
+                    Long.parseLong(rs.getString(1));
+                min = jdbcSourceOptions.getPartitionLowerBound().isPresent() ? 
jdbcSourceOptions.getPartitionLowerBound().get() :
+                    Long.parseLong(rs.getString(2));
+            }
+        }
+        return new PartitionParameter(columnName, min, max);
+    }
+
+    private PartitionParameter initPartitionParameterAndExtendSql(Connection 
connection) throws SQLException {
+        if (jdbcSourceOptions.getPartitionColumn().isPresent()) {
+            String partitionColumn = 
jdbcSourceOptions.getPartitionColumn().get();
+            Map<String, SeaTunnelDataType<?>> fieldTypes = new HashMap<>();
+            for (int i = 0; i < typeInfo.getFieldNames().length; i++) {
+                fieldTypes.put(typeInfo.getFieldName(i), 
typeInfo.getFieldType(i));
+            }
+            if (!fieldTypes.containsKey(partitionColumn)) {
+                throw new IllegalArgumentException(String.format("field %s not 
contain in query %s",
+                    partitionColumn, query));
+            }
+            SeaTunnelDataType<?> partitionColumnType = 
fieldTypes.get(partitionColumn);
+            if (!isNumericType(partitionColumnType)) {
+                throw new IllegalArgumentException(String.format("%s is not 
numeric type", partitionColumn));
+            }
+            PartitionParameter partitionParameter = 
initPartitionParameter(partitionColumn, connection);
+            query = String.format("SELECT * FROM (%s) tt where " + 
partitionColumn + " >= ? AND " + partitionColumn + " <= ?", query);
+
+            return partitionParameter;
+        } else {
+            LOG.info("The partition_column parameter is not configured, and 
the source parallelism is set to 1");
+        }
+
+        return null;
+    }
+
+    private boolean isNumericType(SeaTunnelDataType<?> type) {
+        return type.equals(BasicType.INT_TYPE) || 
type.equals(BasicType.BIG_INT_TYPE)
+            || type.equals(BasicType.LONG_TYPE);
+    }
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
new file mode 100644
index 000000000..3b222958c
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class JdbcSourceReader implements SourceReader<SeaTunnelRow, 
JdbcSourceSplit> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
+
+    SourceReader.Context context;
+    Deque<JdbcSourceSplit> splits = new LinkedList<>();
+    JdbcInputFormat inputFormat;
+    boolean noMoreSplit;
+
+    public JdbcSourceReader(JdbcInputFormat inputFormat, SourceReader.Context 
context) {
+        this.inputFormat = inputFormat;
+        this.context = context;
+    }
+
+    @Override
+    public void open() throws Exception {
+        inputFormat.openInputFormat();
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputFormat.closeInputFormat();
+    }
+
+    @Override
+    @SuppressWarnings("magicnumber")
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        JdbcSourceSplit split = splits.poll();
+        if (null != split) {
+            inputFormat.open(split);
+            while (!inputFormat.reachedEnd()) {
+                SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
+                output.collect(seaTunnelRow);
+            }
+            inputFormat.close();
+        } else if (noMoreSplit) {
+            // signal to the source that we have reached the end of the data.
+            LOG.info("Closed the bounded jdbc source");
+            context.signalNoMoreElement();
+        } else {
+            Thread.sleep(1000L);
+        }
+    }
+
+    @Override
+    public List<JdbcSourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return new ArrayList<>(splits);
+    }
+
+    @Override
+    public void addSplits(List<JdbcSourceSplit> splits) {
+        this.splits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
new file mode 100644
index 000000000..106cb439a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class JdbcSourceSplit implements SourceSplit {
+
+    Object[] parameterValues;
+    Integer splitId;
+
+    @Override
+    public String splitId() {
+        return splitId.toString();
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
new file mode 100644
index 000000000..f3b5368e9
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JdbcSourceSplitEnumerator implements 
SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
+
+    SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
+    List<JdbcSourceSplit> allSplit = new ArrayList<>();
+    JdbcSourceOptions jdbcSourceOptions;
+    PartitionParameter partitionParameter;
+
+    public 
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> 
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter 
partitionParameter) {
+        this.enumeratorContext = enumeratorContext;
+        this.jdbcSourceOptions = jdbcSourceOptions;
+        this.partitionParameter = partitionParameter;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public void run() throws Exception {
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        int parallelism = enumeratorContext.currentParallelism();
+        if (allSplit.isEmpty()) {
+            if (null != partitionParameter) {
+                JdbcNumericBetweenParametersProvider 
jdbcNumericBetweenParametersProvider = new 
JdbcNumericBetweenParametersProvider(partitionParameter.minValue, 
partitionParameter.maxValue).ofBatchNum(parallelism);
+                Serializable[][] parameterValues = 
jdbcNumericBetweenParametersProvider.getParameterValues();
+                for (int i = 0; i < parameterValues.length; i++) {
+                    allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+                }
+            } else {
+                allSplit.add(new JdbcSourceSplit(null, 0));
+            }
+        }
+        // Filter the split that the current task needs to run
+        List<JdbcSourceSplit> splits = allSplit.stream().filter(p -> p.splitId 
% parallelism == subtaskId).collect(Collectors.toList());
+        enumeratorContext.assignSplit(subtaskId, splits);
+        enumeratorContext.signalNoMoreSplits(subtaskId);
+    }
+
+    @Override
+    public JdbcSourceState snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
new file mode 100644
index 000000000..c16ebb46d
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PartitionParameter implements Serializable {
+
+    String partitionColumnName;
+    Long minValue;
+    Long maxValue;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
new file mode 100644
index 000000000..be9e088ec
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import java.io.Serializable;
+
+public class JdbcSourceState implements Serializable {
+}

Reply via email to