This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 463cafe38 [api-draft][connector] Add SeaTunnel jdbc source (#2048)
463cafe38 is described below
commit 463cafe38e35c62318b3b3a11a477a8b1786c452
Author: ic4y <[email protected]>
AuthorDate: Mon Jun 27 13:15:51 2022 +0800
[api-draft][connector] Add SeaTunnel jdbc source (#2048)
* Add SeaTunnel jdbc sink (#1946)
* Add license head
* fix checkStyle err
---
seatunnel-connectors/plugin-mapping.properties | 2 +
.../seatunnel/jdbc/config/JdbcConfig.java | 102 ++++++++++
.../seatunnel/jdbc/config/JdbcSinkOptions.java | 43 ++++
.../seatunnel/jdbc/config/JdbcSourceOptions.java | 77 ++++++++
.../seatunnel/jdbc/internal/JdbcInputFormat.java | 220 +++++++++++++++++++++
.../seatunnel/jdbc/internal/JdbcOutputFormat.java | 22 +--
.../jdbc/internal/connection/DataSourceUtils.java | 20 +-
.../connection/SimpleJdbcConnectionProvider.java | 6 +-
.../converter/AbstractJdbcRowConverter.java | 92 +++++++++
.../jdbc/internal/converter/JdbcRowConverter.java | 40 ++++
.../jdbc/internal/dialect/JdbcDialect.java | 52 +++++
.../jdbc/internal/dialect/JdbcDialectFactory.java | 40 ++++
.../jdbc/internal/dialect/JdbcDialectLoader.java | 102 ++++++++++
.../internal/dialect/JdbcDialectTypeMapper.java | 34 ++++
.../dialect/mysql/MySqlDialectFactory.java | 38 ++++
.../internal/dialect/mysql/MySqlTypeMapper.java | 167 ++++++++++++++++
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 39 ++++
.../dialect/mysql/MysqlJdbcRowConverter.java | 38 ++++
.../internal/options/JdbcConnectionOptions.java | 200 +++++++++++++++++++
.../split/JdbcGenericParameterValuesProvider.java | 39 ++++
.../JdbcNumericBetweenParametersProvider.java | 116 +++++++++++
.../split/JdbcParameterValuesProvider.java | 31 +++
.../seatunnel/jdbc/internal/xa/XaFacade.java | 6 +-
.../jdbc/internal/xa/XaFacadeImplAutoLoad.java | 15 +-
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 13 +-
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 22 +--
.../jdbc/sink/JdbcSinkAggregatedCommitter.java | 12 +-
.../seatunnel/jdbc/sink/JdbcSinkCommitter.java | 12 +-
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 10 +-
.../seatunnel/jdbc/source/JdbcSource.java | 211 ++++++++++++++++++++
.../seatunnel/jdbc/source/JdbcSourceReader.java | 97 +++++++++
.../seatunnel/jdbc/source/JdbcSourceSplit.java | 36 ++++
.../jdbc/source/JdbcSourceSplitEnumerator.java | 100 ++++++++++
.../seatunnel/jdbc/source/PartitionParameter.java | 32 +++
.../seatunnel/jdbc/state/JdbcSourceState.java | 23 +++
35 files changed, 2040 insertions(+), 69 deletions(-)
diff --git a/seatunnel-connectors/plugin-mapping.properties
b/seatunnel-connectors/plugin-mapping.properties
index 992c399a0..d06354594 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -92,3 +92,5 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.source.Http = seatunnel-connector-seatunnel-http
seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
+seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
+seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
new file mode 100644
index 000000000..4a705247b
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.Serializable;
+
+public class JdbcConfig implements Serializable {
+
+ public static final String URL = "url";
+
+ public static final String DRIVER = "driver";
+
+ public static final String CONNECTION_CHECK_TIMEOUT_SEC =
"connection_check_timeout_sec";
+
+ public static final String MAX_RETRIES = "max_retries";
+
+ public static final String USER = "user";
+
+ public static final String PASSWORD = "password";
+
+ public static final String QUERY = "query";
+
+ public static final String PARALLELISM = "parallelism";
+
+
+ public static final String BATCH_SIZE = "batch_size";
+
+ public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+
+
+ public static final String IS_EXACTLY_ONCE = "is_exactly_once";
+
+ public static final String XA_DATA_SOURCE_CLASS_NAME =
"xa_data_source_class_name";
+
+
+ public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts";
+
+ public static final String TRANSACTION_TIMEOUT_SEC =
"transaction_timeout_sec";
+
+
+ //source config
+ public static final String PARTITION_COLUMN = "partition_column";
+ public static final String PARTITION_UPPER_BOUND = "partition_upper_bound";
+ public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";
+
+ public static JdbcConnectionOptions buildJdbcConnectionOptions(Config
config) {
+
+ JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions();
+ jdbcOptions.url = config.getString(JdbcConfig.URL);
+ jdbcOptions.driverName = config.getString(JdbcConfig.DRIVER);
+ if (config.hasPath(JdbcConfig.USER)) {
+ jdbcOptions.username = config.getString(JdbcConfig.USER);
+ }
+ if (config.hasPath(JdbcConfig.PASSWORD)) {
+ jdbcOptions.password = config.getString(JdbcConfig.PASSWORD);
+ }
+ jdbcOptions.query = config.getString(JdbcConfig.QUERY);
+
+ if (config.hasPath(JdbcConfig.MAX_RETRIES)) {
+ jdbcOptions.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES);
+ }
+ if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) {
+ jdbcOptions.connectionCheckTimeoutSeconds =
config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC);
+ }
+ if (config.hasPath(JdbcConfig.BATCH_SIZE)) {
+ jdbcOptions.batchSize = config.getInt(JdbcConfig.BATCH_SIZE);
+ }
+ if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) {
+ jdbcOptions.batchIntervalMs =
config.getInt(JdbcConfig.BATCH_INTERVAL_MS);
+ }
+
+ if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) {
+ jdbcOptions.xaDataSourceClassName =
config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME);
+ if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) {
+ jdbcOptions.maxCommitAttempts =
config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS);
+ }
+ if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) {
+ jdbcOptions.transactionTimeoutSec =
config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
+ }
+ }
+ return jdbcOptions;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
new file mode 100644
index 000000000..964c859e5
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class JdbcSinkOptions implements Serializable {
+ private JdbcConnectionOptions jdbcConnectionOptions;
+ private boolean isExactlyOnce;
+
+ public JdbcSinkOptions(Config config) {
+ this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
+ if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE) &&
config.getBoolean(JdbcConfig.IS_EXACTLY_ONCE)) {
+ this.isExactlyOnce = true;
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
new file mode 100644
index 000000000..69e3ae454
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+@Data
+@AllArgsConstructor
+public class JdbcSourceOptions implements Serializable {
+ private JdbcConnectionOptions jdbcConnectionOptions;
+ private String partitionColumn;
+ private Long partitionUpperBound;
+ private Long partitionLowerBound;
+
+ private Integer parallelism;
+
+ public JdbcSourceOptions(Config config) {
+ this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
+ if (config.hasPath(JdbcConfig.PARTITION_COLUMN)) {
+ this.partitionColumn =
config.getString(JdbcConfig.PARTITION_COLUMN);
+ }
+ if (config.hasPath(JdbcConfig.PARTITION_UPPER_BOUND)) {
+ this.partitionUpperBound =
config.getLong(JdbcConfig.PARTITION_UPPER_BOUND);
+ }
+ if (config.hasPath(JdbcConfig.PARTITION_LOWER_BOUND)) {
+ this.partitionLowerBound =
config.getLong(JdbcConfig.PARTITION_LOWER_BOUND);
+ }
+ if (config.hasPath(JdbcConfig.PARALLELISM)) {
+ this.parallelism = config.getInt(JdbcConfig.PARALLELISM);
+ }
+ }
+
+ public JdbcConnectionOptions getJdbcConnectionOptions() {
+ return jdbcConnectionOptions;
+ }
+
+ public Optional<String> getPartitionColumn() {
+ return Optional.ofNullable(partitionColumn);
+ }
+
+ public Optional<Long> getPartitionUpperBound() {
+ return Optional.ofNullable(partitionUpperBound);
+ }
+
+ public Optional<Long> getPartitionLowerBound() {
+ return Optional.ofNullable(partitionLowerBound);
+ }
+
+ public Optional<Integer> getParallelism() {
+ return Optional.ofNullable(parallelism);
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
new file mode 100644
index 000000000..1baf17256
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * InputFormat to read data from a database and generate Rows. The InputFormat
has to be configured
+ * using the supplied InputFormatBuilder. A valid RowTypeInfo must be properly
configured in the
+ * builder
+ */
+
+public class JdbcInputFormat implements Serializable {
+
+ protected static final long serialVersionUID = 2L;
+ protected static final Logger LOG =
LoggerFactory.getLogger(JdbcInputFormat.class);
+
+ protected JdbcConnectionProvider connectionProvider;
+ protected JdbcRowConverter jdbcRowConverter;
+ protected String queryTemplate;
+ protected SeaTunnelRowType typeInfo;
+ protected int fetchSize;
+ // Boolean to distinguish between default value and explicitly set
autoCommit mode.
+ protected Boolean autoCommit;
+
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+
+ protected boolean hasNext;
+
+ public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
+ JdbcRowConverter jdbcRowConverter,
+ SeaTunnelRowType typeInfo,
+ String queryTemplate,
+ int fetchSize,
+ Boolean autoCommit
+ ) {
+ this.connectionProvider = connectionProvider;
+ this.jdbcRowConverter = jdbcRowConverter;
+ this.typeInfo = typeInfo;
+ this.queryTemplate = queryTemplate;
+ this.fetchSize = fetchSize;
+ this.autoCommit = autoCommit;
+ }
+
+ public void openInputFormat() {
+ // called once per inputFormat (on open)
+ try {
+ Connection dbConn = connectionProvider.getOrEstablishConnection();
+
+ // set autoCommit mode only if it was explicitly configured.
+ // keep connection default otherwise.
+ if (autoCommit != null) {
+ dbConn.setAutoCommit(autoCommit);
+ }
+
+ statement = dbConn.prepareStatement(queryTemplate);
+ if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
+ statement.setFetchSize(fetchSize);
+ }
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("open() failed." +
se.getMessage(), se);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException(
+ "JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+ }
+ }
+
+ public void closeInputFormat() {
+ // called once per inputFormat (on close)
+ try {
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOG.info("Inputformat Statement couldn't be closed - " +
se.getMessage());
+ } finally {
+ statement = null;
+ }
+
+ connectionProvider.closeConnection();
+
+ }
+
+ /**
+ * Connects to the source database and executes the query
+ *
+ * @param inputSplit which is ignored if this InputFormat is executed as a
non-parallel source,
+ * a "hook" to the query parameters otherwise (using its
<i>parameterId</i>)
+ * @throws IOException if there's an error during the execution of the
query
+ */
+ public void open(JdbcSourceSplit inputSplit) throws IOException {
+ try {
+ Object[] parameterValues = inputSplit.getParameterValues();
+ if (parameterValues != null) {
+ for (int i = 0; i < parameterValues.length; i++) {
+ Object param = parameterValues[i];
+ if (param instanceof String) {
+ statement.setString(i + 1, (String) param);
+ } else if (param instanceof Long) {
+ statement.setLong(i + 1, (Long) param);
+ } else if (param instanceof Integer) {
+ statement.setInt(i + 1, (Integer) param);
+ } else if (param instanceof Double) {
+ statement.setDouble(i + 1, (Double) param);
+ } else if (param instanceof Boolean) {
+ statement.setBoolean(i + 1, (Boolean) param);
+ } else if (param instanceof Float) {
+ statement.setFloat(i + 1, (Float) param);
+ } else if (param instanceof BigDecimal) {
+ statement.setBigDecimal(i + 1, (BigDecimal) param);
+ } else if (param instanceof Byte) {
+ statement.setByte(i + 1, (Byte) param);
+ } else if (param instanceof Short) {
+ statement.setShort(i + 1, (Short) param);
+ } else if (param instanceof Date) {
+ statement.setDate(i + 1, (Date) param);
+ } else if (param instanceof Time) {
+ statement.setTime(i + 1, (Time) param);
+ } else if (param instanceof Timestamp) {
+ statement.setTimestamp(i + 1, (Timestamp) param);
+ } else if (param instanceof Array) {
+ statement.setArray(i + 1, (Array) param);
+ } else {
+ // extends with other types if needed
+ throw new IllegalArgumentException(
+ "open() failed. Parameter "
+ + i
+ + " of type "
+ + param.getClass()
+ + " is not handled (yet).");
+ }
+ }
+ }
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("open() failed." +
se.getMessage(), se);
+ }
+ }
+
+ /**
+ * Closes all resources used.
+ *
+ * @throws IOException Indicates that a resource could not be closed.
+ */
+ public void close() throws IOException {
+ if (resultSet == null) {
+ return;
+ }
+ try {
+ resultSet.close();
+ } catch (SQLException se) {
+ LOG.info("Inputformat ResultSet couldn't be closed - " +
se.getMessage());
+ }
+ }
+
+ /**
+ * Checks whether all data has been read.
+ *
+ * @return boolean value indication whether all data has been read.
+ */
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ /**
+ * Convert a row of data to seatunnelRow
+ */
+ public SeaTunnelRow nextRecord() throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet,
resultSet.getMetaData(), typeInfo);
+ // update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return seaTunnelRow;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(),
se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
index f7738140c..afa9ebbba 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -21,7 +21,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -51,7 +51,7 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
private static final Logger LOG =
LoggerFactory.getLogger(JdbcOutputFormat.class);
- private final JdbcConnectorOptions jdbcConnectorOptions;
+ private final JdbcConnectionOptions jdbcConnectionOptions;
private final StatementExecutorFactory<E> statementExecutorFactory;
private transient E jdbcStatementExecutor;
@@ -64,10 +64,10 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
public JdbcOutputFormat(
JdbcConnectionProvider connectionProvider,
- JdbcConnectorOptions jdbcConnectorOptions,
+ JdbcConnectionOptions jdbcConnectionOptions,
StatementExecutorFactory<E> statementExecutorFactory) {
this.connectionProvider = checkNotNull(connectionProvider);
- this.jdbcConnectorOptions = checkNotNull(jdbcConnectorOptions);
+ this.jdbcConnectionOptions = checkNotNull(jdbcConnectionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
}
@@ -85,7 +85,7 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
}
jdbcStatementExecutor =
createAndOpenStatementExecutor(statementExecutorFactory);
- if (jdbcConnectorOptions.getBatchIntervalMs() != 0 &&
jdbcConnectorOptions.getBatchSize() != 1) {
+ if (jdbcConnectionOptions.getBatchIntervalMs() != 0 &&
jdbcConnectionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
1, runnable -> {
@@ -109,8 +109,8 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
}
}
},
- jdbcConnectorOptions.getBatchIntervalMs(),
- jdbcConnectorOptions.getBatchIntervalMs(),
+ jdbcConnectionOptions.getBatchIntervalMs(),
+ jdbcConnectionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}
@@ -140,8 +140,8 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
try {
addToBatch(record);
batchCount++;
- if (jdbcConnectorOptions.getBatchSize() > 0
- && batchCount >= jdbcConnectorOptions.getBatchSize()) {
+ if (jdbcConnectionOptions.getBatchSize() > 0
+ && batchCount >= jdbcConnectionOptions.getBatchSize()) {
flush();
}
}
@@ -159,7 +159,7 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
throws IOException {
checkFlushException();
final int sleepMs = 1000;
- for (int i = 0; i <= jdbcConnectorOptions.getMaxRetries(); i++) {
+ for (int i = 0; i <= jdbcConnectionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
@@ -167,7 +167,7 @@ public class JdbcOutputFormat<I, E extends
JdbcBatchStatementExecutor<I>>
}
catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
- if (i >= jdbcConnectorOptions.getMaxRetries()) {
+ if (i >= jdbcConnectionOptions.getMaxRetries()) {
ExceptionUtils.rethrowIOException(e);
}
try {
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
index 1e2e213a7..43ac649a3 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import com.google.common.base.CaseFormat;
import lombok.NonNull;
@@ -39,21 +39,21 @@ public class DataSourceUtils
private static final String SETTER_PREFIX = "set";
- public static CommonDataSource buildCommonDataSource(@NonNull
JdbcConnectorOptions jdbcConnectorOptions)
+ public static CommonDataSource buildCommonDataSource(@NonNull
JdbcConnectionOptions jdbcConnectionOptions)
throws InvocationTargetException, IllegalAccessException {
- CommonDataSource dataSource = (CommonDataSource)
loadDataSource(jdbcConnectorOptions.getXaDataSourceClassName());
- setProperties(dataSource,
buildDatabaseAccessConfig(jdbcConnectorOptions));
+ CommonDataSource dataSource = (CommonDataSource)
loadDataSource(jdbcConnectionOptions.getXaDataSourceClassName());
+ setProperties(dataSource,
buildDatabaseAccessConfig(jdbcConnectionOptions));
return dataSource;
}
- private static Map<String, Object>
buildDatabaseAccessConfig(JdbcConnectorOptions jdbcConnectorOptions) {
+ private static Map<String, Object>
buildDatabaseAccessConfig(JdbcConnectionOptions jdbcConnectionOptions) {
HashMap<String, Object> accessConfig = new HashMap<>();
- accessConfig.put("url", jdbcConnectorOptions.getUrl());
- if (jdbcConnectorOptions.getUsername().isPresent()) {
- accessConfig.put("user", jdbcConnectorOptions.getUsername().get());
+ accessConfig.put("url", jdbcConnectionOptions.getUrl());
+ if (jdbcConnectionOptions.getUsername().isPresent()) {
+ accessConfig.put("user",
jdbcConnectionOptions.getUsername().get());
}
- if (jdbcConnectorOptions.getPassword().isPresent()) {
- accessConfig.put("password",
jdbcConnectorOptions.getPassword().get());
+ if (jdbcConnectionOptions.getPassword().isPresent()) {
+ accessConfig.put("password",
jdbcConnectionOptions.getPassword().get());
}
return accessConfig;
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index 7e69e990e..f6e4d56f3 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
import static com.google.common.base.Preconditions.checkNotNull;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import lombok.NonNull;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class SimpleJdbcConnectionProvider
private static final long serialVersionUID = 1L;
- private final JdbcConnectorOptions jdbcOptions;
+ private final JdbcConnectionOptions jdbcOptions;
private transient Driver loadedDriver;
private transient Connection connection;
@@ -60,7 +60,7 @@ public class SimpleJdbcConnectionProvider
DriverManager.getDrivers();
}
- public SimpleJdbcConnectionProvider(@NonNull JdbcConnectorOptions
jdbcOptions) {
+ public SimpleJdbcConnectionProvider(@NonNull JdbcConnectionOptions
jdbcOptions) {
this.jdbcOptions = jdbcOptions;
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
new file mode 100644
index 000000000..57d3501b0
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for all converters that convert between JDBC object and
Seatunnel internal object.
+ */
+public abstract class AbstractJdbcRowConverter implements JdbcRowConverter {
+
+ public abstract String converterName();
+
+ public AbstractJdbcRowConverter() {
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:Indentation")
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData,
SeaTunnelRowType typeInfo) throws SQLException {
+
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+
+ for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+ if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBoolean(i);
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getByte(i);
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getShort(i);
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getInt(i);
+ } else if (BasicType.BIG_INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getObject(i);
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getLong(i);
+ } else if (BasicType.BIG_DECIMAL_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBigDecimal(i);
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getFloat(i);
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDouble(i);
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getString(i);
+ } else if (BasicType.DATE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getObject(i);
+ } else if
(LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getTime(i).toLocalTime();
+ } else if
(LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i).toLocalDate();
+ } else if
(LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getTimestamp(i).toLocalDateTime();
+ } else if
(PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBytes(i);
+ } else {
+ throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
+ }
+
+ fields.add(seatunnelField);
+ }
+
+ return new SeaTunnelRow(fields.toArray());
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
new file mode 100644
index 000000000..b50d149b6
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/**
+ * Converter that is responsible to convert between JDBC object and Seatunnel
data
+ * structure {@link SeaTunnelRow}.
+ */
+public interface JdbcRowConverter extends Serializable {
+
+ /**
+ * Convert data retrieved from {@link ResultSet} to internal {@link
SeaTunnelRow}.
+ *
+ * @param rs ResultSet from JDBC
+ */
+ SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData,
SeaTunnelRowType typeInfo) throws SQLException;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
new file mode 100644
index 000000000..155d8d4c2
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+
+import java.io.Serializable;
+
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system.
Dialects should be immutable
+ * and stateless.
+ */
+
+public interface JdbcDialect extends Serializable {
+
+ /**
+ * Get the name of jdbc dialect.
+ *
+ * @return the dialect name.
+ */
+ String dialectName();
+
+ /**
+ * Get converter that convert jdbc object to seatunnel internal object.
+ *
+ * @return a row converter for the database
+ */
+ JdbcRowConverter getRowConverter();
+
+
+ /**
+ * get jdbc meta-information type to seatunnel data type mapper.
+ * @return a type mapper for the database
+ */
+ JdbcDialectTypeMapper getJdbcDialectTypeMapper();
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
new file mode 100644
index 000000000..eabb406ae
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+/**
+ * A factory to create a specific {@link JdbcDialect}
+ * @see JdbcDialect
+ */
+
+public interface JdbcDialectFactory {
+
+ /**
+ * Retrieves whether the dialect thinks that it can open a connection to
the given URL.
+ * Typically, dialects will return <code>true</code> if they understand
the sub-protocol
+ * specified in the URL and <code>false</code> if they do not.
+ *
+ * @param url the URL of the database
+ * @return <code>true</code> if this dialect understands the given URL;
<code>false</code>
+ * otherwise.
+ */
+ boolean acceptsURL(String url);
+
+ /** @return Creates a new instance of the {@link JdbcDialect}. */
+ JdbcDialect create();
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
new file mode 100644
index 000000000..cd8efa909
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/** Utility for working with {@link JdbcDialect}. */
+public final class JdbcDialectLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcDialectLoader.class);
+
+ private JdbcDialectLoader() {}
+
+ /**
+ * Loads the unique JDBC Dialect that can handle the given database url.
+ *
+ * @param url A database URL.
+ * @throws IllegalStateException if the loader cannot find exactly one
dialect that can
+ * unambiguously process the given database URL.
+ * @return The loaded dialect.
+ */
+ public static JdbcDialect load(String url) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
+
+ if (foundFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factories that
implement '%s' in the classpath.",
+ JdbcDialectFactory.class.getName()));
+ }
+
+ final List<JdbcDialectFactory> matchingFactories =
+ foundFactories.stream().filter(f ->
f.acceptsURL(url)).collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factory that can
handle url '%s' that implements '%s' in the classpath.\n\n"
+ + "Available factories are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ foundFactories.stream()
+ .map(f -> f.getClass().getName())
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new IllegalStateException(
+ String.format(
+ "Multiple jdbc dialect factories can handle url
'%s' that implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return matchingFactories.get(0).create();
+ }
+
+ private static List<JdbcDialectFactory> discoverFactories(ClassLoader
classLoader) {
+ try {
+ final List<JdbcDialectFactory> result = new LinkedList<>();
+ ServiceLoader.load(JdbcDialectFactory.class, classLoader)
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for jdbc dialects
factory.", e);
+ throw new RuntimeException(
+ "Could not load service provider for jdbc dialects
factory.", e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
new file mode 100644
index 000000000..acf296801
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/** Separate the jdbc meta-information type to SeaTunnelDataType into the
interface. */
+public interface JdbcDialectTypeMapper extends Serializable {
+
+ /**
+ * Convert ResultSetMetaData to Seatunnel data type {@link
SeaTunnelDataType}.
+ */
+ SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
+ throws SQLException;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
new file mode 100644
index 000000000..56a7f8db6
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link MysqlDialect}. */
+
+@AutoService(JdbcDialectFactory.class)
+public class MySqlDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:mysql:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new MysqlDialect();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
new file mode 100644
index 000000000..3b853a2c3
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class MySqlTypeMapper implements JdbcDialectTypeMapper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcDialect.class);
+
+ // ============================data types=====================
+
+ private static final String MYSQL_UNKNOWN = "UNKNOWN";
+ private static final String MYSQL_BIT = "BIT";
+
+ // -------------------------number----------------------------
+ private static final String MYSQL_TINYINT = "TINYINT";
+ private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String MYSQL_SMALLINT = "SMALLINT";
+ private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+ private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT
UNSIGNED";
+ private static final String MYSQL_INT = "INT";
+ private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+ private static final String MYSQL_INTEGER = "INTEGER";
+ private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+ private static final String MYSQL_BIGINT = "BIGINT";
+ private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String MYSQL_DECIMAL = "DECIMAL";
+ private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String MYSQL_FLOAT = "FLOAT";
+ private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String MYSQL_DOUBLE = "DOUBLE";
+ private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+ // -------------------------string----------------------------
+ private static final String MYSQL_CHAR = "CHAR";
+ private static final String MYSQL_VARCHAR = "VARCHAR";
+ private static final String MYSQL_TINYTEXT = "TINYTEXT";
+ private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String MYSQL_TEXT = "TEXT";
+ private static final String MYSQL_LONGTEXT = "LONGTEXT";
+ private static final String MYSQL_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ private static final String MYSQL_DATE = "DATE";
+ private static final String MYSQL_DATETIME = "DATETIME";
+ private static final String MYSQL_TIME = "TIME";
+ private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+ private static final String MYSQL_YEAR = "YEAR";
+
+ // ------------------------------blob-------------------------
+ private static final String MYSQL_TINYBLOB = "TINYBLOB";
+ private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String MYSQL_BLOB = "BLOB";
+ private static final String MYSQL_LONGBLOB = "LONGBLOB";
+ private static final String MYSQL_BINARY = "BINARY";
+ private static final String MYSQL_VARBINARY = "VARBINARY";
+ private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ String columnName = metadata.getColumnName(colIndex);
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ switch (mysqlType) {
+ case MYSQL_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case MYSQL_TINYINT:
+ case MYSQL_TINYINT_UNSIGNED:
+ case MYSQL_SMALLINT:
+ case MYSQL_SMALLINT_UNSIGNED:
+ case MYSQL_MEDIUMINT:
+ case MYSQL_MEDIUMINT_UNSIGNED:
+ case MYSQL_INT:
+ case MYSQL_INTEGER:
+ return BasicType.INT_TYPE;
+ case MYSQL_INT_UNSIGNED:
+ case MYSQL_INTEGER_UNSIGNED:
+ case MYSQL_BIGINT:
+ return BasicType.LONG_TYPE;
+ case MYSQL_BIGINT_UNSIGNED:
+ return BasicType.BIG_INT_TYPE;
+ case MYSQL_DECIMAL:
+ case MYSQL_DECIMAL_UNSIGNED:
+ return BasicType.BIG_DECIMAL_TYPE;
+ case MYSQL_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case MYSQL_FLOAT_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_FLOAT_UNSIGNED);
+ return BasicType.FLOAT_TYPE;
+ case MYSQL_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case MYSQL_DOUBLE_UNSIGNED:
+ LOG.warn("{} will probably cause value overflow.",
MYSQL_DOUBLE_UNSIGNED);
+ return BasicType.DOUBLE_TYPE;
+ case MYSQL_CHAR:
+ case MYSQL_TINYTEXT:
+ case MYSQL_MEDIUMTEXT:
+ case MYSQL_TEXT:
+ case MYSQL_VARCHAR:
+ case MYSQL_JSON:
+ return BasicType.STRING_TYPE;
+ case MYSQL_LONGTEXT:
+ LOG.warn(
+ "Type '{}' has a maximum precision of 536870911 in MySQL. "
+ + "Due to limitations in the seatunnel type system, "
+ + "the precision will be set to 2147483647.",
+ MYSQL_LONGTEXT);
+ return BasicType.STRING_TYPE;
+
+ case MYSQL_YEAR:
+ case MYSQL_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case MYSQL_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case MYSQL_DATETIME:
+ case MYSQL_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+ //Doesn't support binary yet
+ case MYSQL_TINYBLOB:
+ case MYSQL_MEDIUMBLOB:
+ case MYSQL_BLOB:
+ case MYSQL_LONGBLOB:
+ case MYSQL_VARBINARY:
+ case MYSQL_BINARY:
+ return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+
+ case MYSQL_GEOMETRY:
+ case MYSQL_UNKNOWN:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support MySQL type '%s' on column '%s' yet.",
+ mysqlType, jdbcColumnName));
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
new file mode 100644
index 000000000..3e4d77158
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class MysqlDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "MySQL";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new MysqlJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new MySqlTypeMapper();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
new file mode 100644
index 000000000..9e3b05c7e
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlJdbcRowConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class MysqlJdbcRowConverter extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return "MySql";
+ }
+
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData,
SeaTunnelRowType typeInfo) throws SQLException {
+ return super.toInternal(rs, metaData, typeInfo);
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
new file mode 100644
index 000000000..19865fed7
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectionOptions.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+public class JdbcConnectionOptions
+ implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC = 30;
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private static final int DEFAULT_BATCH_SIZE = 300;
+ private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+ private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+ private static final int DEFAULT_TRANSACTION_TIMEOUT_SEC = -1;
+
+ public String url;
+ public String driverName;
+ public int connectionCheckTimeoutSeconds =
DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+ public int maxRetries = DEFAULT_MAX_RETRIES;
+ public String username;
+ public String password;
+ public String query;
+
+ public int batchSize = DEFAULT_BATCH_SIZE;
+ public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+ public String xaDataSourceClassName;
+
+ public int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+
+ public int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+ public JdbcConnectionOptions() {
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public int getConnectionCheckTimeoutSeconds() {
+ return connectionCheckTimeoutSeconds;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public Optional<String> getUsername() {
+ return Optional.ofNullable(username);
+ }
+
+ public Optional<String> getPassword() {
+ return Optional.ofNullable(password);
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getBatchIntervalMs() {
+ return batchIntervalMs;
+ }
+
+ public String getXaDataSourceClassName() {
+ return xaDataSourceClassName;
+ }
+
+ public int getMaxCommitAttempts() {
+ return maxCommitAttempts;
+ }
+
+ public Optional<Integer> getTransactionTimeoutSec() {
+ return transactionTimeoutSec < 0 ? Optional.empty() :
Optional.of(transactionTimeoutSec);
+ }
+
+ public static JdbcConnectionOptionsBuilder builder() {
+ return new JdbcConnectionOptionsBuilder();
+ }
+
+ public static final class JdbcConnectionOptionsBuilder {
+ private String url;
+ private String driverName;
+ private int connectionCheckTimeoutSeconds =
DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private String username;
+ private String password;
+ private String query;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+ private String xaDataSourceClassName;
+ private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+ private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+ private JdbcConnectionOptionsBuilder() {
+ }
+
+ public JdbcConnectionOptionsBuilder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
+ this.driverName = driverName;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder
withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withQuery(String query) {
+ this.query = query;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withBatchIntervalMs(int
batchIntervalMs) {
+ this.batchIntervalMs = batchIntervalMs;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withXaDataSourceClassName(String
xaDataSourceClassName) {
+ this.xaDataSourceClassName = xaDataSourceClassName;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withMaxCommitAttempts(int
maxCommitAttempts) {
+ this.maxCommitAttempts = maxCommitAttempts;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withTransactionTimeoutSec(int
transactionTimeoutSec) {
+ this.transactionTimeoutSec = transactionTimeoutSec;
+ return this;
+ }
+
+ public JdbcConnectionOptions build() {
+ JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions();
+ jdbcConnectionOptions.batchSize = this.batchSize;
+ jdbcConnectionOptions.batchIntervalMs = this.batchIntervalMs;
+ jdbcConnectionOptions.driverName = this.driverName;
+ jdbcConnectionOptions.maxRetries = this.maxRetries;
+ jdbcConnectionOptions.password = this.password;
+ jdbcConnectionOptions.connectionCheckTimeoutSeconds =
this.connectionCheckTimeoutSeconds;
+ jdbcConnectionOptions.query = this.query;
+ jdbcConnectionOptions.url = this.url;
+ jdbcConnectionOptions.username = this.username;
+ jdbcConnectionOptions.transactionTimeoutSec =
this.transactionTimeoutSec;
+ jdbcConnectionOptions.maxCommitAttempts = this.maxCommitAttempts;
+ jdbcConnectionOptions.xaDataSourceClassName =
this.xaDataSourceClassName;
+ return jdbcConnectionOptions;
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
new file mode 100644
index 000000000..ca3a9d68d
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcGenericParameterValuesProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import java.io.Serializable;
+
+/**
+ * This splits generator actually does nothing but wrapping the query
parameters computed by the
+ * user before creating the {@link
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource} instance.
+ */
+public class JdbcGenericParameterValuesProvider implements
JdbcParameterValuesProvider {
+
+ private final Serializable[][] parameters;
+
+ public JdbcGenericParameterValuesProvider(Serializable[][] parameters) {
+ this.parameters = parameters;
+ }
+
+ @Override
+ public Serializable[][] getParameterValues() {
+ // do nothing...precomputed externally
+ return parameters;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
new file mode 100644
index 000000000..385bc0231
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcNumericBetweenParametersProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+
+/**
+ * This query parameters generator is an helper class to parameterize from/to
queries on a numeric
+ * column. The generated array of from/to values will be equally sized to
fetchSize (apart from the
+ * last one), ranging from minVal up to maxVal.
+ *
+ * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK
<CODE>id</CODE>, using a
+ * query like:
+ *
+ * <PRE>
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * <p>You can take advantage of this class to automatically generate the
parameters of the BETWEEN
+ * clause, based on the passed constructor parameters.
+ */
+public class JdbcNumericBetweenParametersProvider implements
JdbcParameterValuesProvider {
+
+ private final long minVal;
+ private final long maxVal;
+
+ private long batchSize;
+ private int batchNum;
+
+ /**
+ * NumericBetweenParametersProviderJdbc constructor.
+ *
+ * @param minVal the lower bound of the produced "from" values
+ * @param maxVal the upper bound of the produced "to" values
+ */
+ public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
+ checkArgument(minVal <= maxVal, "minVal must not be larger than
maxVal");
+ this.minVal = minVal;
+ this.maxVal = maxVal;
+ }
+
+ /**
+ * NumericBetweenParametersProviderJdbc constructor.
+ *
+ * @param fetchSize the max distance between the produced from/to pairs
+ * @param minVal the lower bound of the produced "from" values
+ * @param maxVal the upper bound of the produced "to" values
+ */
+ public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal,
long maxVal) {
+ checkArgument(minVal <= maxVal, "minVal must not be larger than
maxVal");
+ this.minVal = minVal;
+ this.maxVal = maxVal;
+ ofBatchSize(fetchSize);
+ }
+
+ public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
+ checkArgument(batchSize > 0, "Batch size must be positive");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ if (batchSize > maxElemCount) {
+ batchSize = maxElemCount;
+ }
+ this.batchSize = batchSize;
+ this.batchNum = new Double(Math.ceil((double) maxElemCount /
batchSize)).intValue();
+ return this;
+ }
+
+ public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
+ checkArgument(batchNum > 0, "Batch number must be positive");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ if (batchNum > maxElemCount) {
+ batchNum = (int) maxElemCount;
+ }
+ this.batchNum = batchNum;
+ this.batchSize = new Double(Math.ceil((double) maxElemCount /
batchNum)).longValue();
+ return this;
+ }
+
+ @Override
+ public Serializable[][] getParameterValues() {
+ checkState(
+ batchSize > 0,
+ "Batch size and batch number must be positive. Have you called
`ofBatchSize` or `ofBatchNum`?");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+ Serializable[][] parameters = new Serializable[batchNum][2];
+ long start = minVal;
+ for (int i = 0; i < batchNum; i++) {
+ long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+ parameters[i] = new Long[] {start, end};
+ start = end + 1;
+ }
+ return parameters;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
new file mode 100644
index 000000000..7b90bc8be
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/split/JdbcParameterValuesProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;
+
+import java.io.Serializable;
+
+/**
+ * This interface is used by the {@link
org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSource} to compute
the list of parallel query to
+ * run (i.e. splits). Each query will be parameterized using a row of the
matrix provided by each
+ * {@link JdbcParameterValuesProvider} implementation.
+ */
+public interface JdbcParameterValuesProvider {
+
+ /** Returns the necessary parameters array to use for query in parallel a
table. */
+ Serializable[][] getParameterValues();
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
index 2f8b78bee..6b323bc73 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
@@ -46,8 +46,8 @@ public interface XaFacade
extends JdbcConnectionProvider, Serializable, AutoCloseable {
static XaFacade fromJdbcConnectionOptions(
- JdbcConnectorOptions jdbcConnectorOptions) {
- return new XaFacadeImplAutoLoad(jdbcConnectorOptions);
+ JdbcConnectionOptions jdbcConnectionOptions) {
+ return new XaFacadeImplAutoLoad(jdbcConnectionOptions);
}
void open() throws Exception;
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
index d8c969ce7..32c64f44a 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
@@ -35,7 +35,7 @@ import static java.util.Optional.empty;
import static java.util.Optional.of;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable;
import org.slf4j.Logger;
@@ -74,14 +74,13 @@ public class XaFacadeImplAutoLoad
new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ,
XA_HEURMIX));
private static final int MAX_RECOVER_CALLS = 100;
- private final JdbcConnectorOptions jdbcConnectorOptions;
+ private final JdbcConnectionOptions jdbcConnectionOptions;
private transient XAResource xaResource;
private transient Connection connection;
private transient XAConnection xaConnection;
- XaFacadeImplAutoLoad(JdbcConnectorOptions jdbcConnectorOptions) {
- checkState(jdbcConnectorOptions.isExactlyOnce(), "is_exactly_once
config error");
- this.jdbcConnectorOptions = jdbcConnectorOptions;
+ XaFacadeImplAutoLoad(JdbcConnectionOptions jdbcConnectionOptions) {
+ this.jdbcConnectionOptions = jdbcConnectionOptions;
}
@Override
@@ -89,16 +88,16 @@ public class XaFacadeImplAutoLoad
checkState(!isOpen(), "already connected");
XADataSource ds;
try {
- ds = (XADataSource)
DataSourceUtils.buildCommonDataSource(jdbcConnectorOptions);
+ ds = (XADataSource)
DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
}
catch (Exception e) {
throw new SQLException(e);
}
xaConnection = ds.getXAConnection();
xaResource = xaConnection.getXAResource();
- if (jdbcConnectorOptions.getTransactionTimeoutSec().isPresent()) {
+ if (jdbcConnectionOptions.getTransactionTimeoutSec().isPresent()) {
try {
-
xaResource.setTransactionTimeout(jdbcConnectorOptions.getTransactionTimeoutSec().get());
+
xaResource.setTransactionTimeout(jdbcConnectionOptions.getTransactionTimeoutSec().get());
}
catch (XAException e) {
throw new SQLException(e);
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index 849ca50c8..b4527a9ff 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -23,11 +23,11 @@ import static
com.google.common.base.Preconditions.checkState;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
@@ -74,10 +74,10 @@ public class JdbcExactlyOnceSinkWriter
SinkWriter.Context sinkcontext,
SeaTunnelContext context,
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
- JdbcConnectorOptions jdbcConnectorOptions,
+ JdbcSinkOptions jdbcSinkOptions,
List<JdbcSinkState> states) {
checkArgument(
- jdbcConnectorOptions.getMaxRetries() == 0,
+ jdbcSinkOptions.getJdbcConnectionOptions().getMaxRetries() == 0,
"JDBC XA sink requires maxRetries equal to 0, otherwise it could "
+ "cause duplicates.");
@@ -85,13 +85,14 @@ public class JdbcExactlyOnceSinkWriter
this.sinkcontext = sinkcontext;
this.recoverStates = states;
this.xidGenerator = XidGenerator.semanticXidGenerator();
+ checkState(jdbcSinkOptions.isExactlyOnce(), "is_exactly_once config
error");
this.xaFacade = XaFacade.fromJdbcConnectionOptions(
- jdbcConnectorOptions);
+ jdbcSinkOptions.getJdbcConnectionOptions());
this.outputFormat = new JdbcOutputFormat<>(
xaFacade,
- jdbcConnectorOptions,
- () -> new
SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(),
statementBuilder));
+ jdbcSinkOptions.getJdbcConnectionOptions(),
+ () -> new
SimpleBatchStatementExecutor<>(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(),
statementBuilder));
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index a225f38f8..219b00c4d 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -27,8 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -53,18 +53,18 @@ public class JdbcSink
private SeaTunnelContext seaTunnelContext;
- private JdbcConnectorOptions jdbcConnectorOptions;
+ private JdbcSinkOptions jdbcSinkOptions;
@Override
public String getPluginName() {
- return "jdbc";
+ return "Jdbc";
}
@Override
public void prepare(Config pluginConfig)
throws PrepareFailException {
this.pluginConfig = pluginConfig;
- this.jdbcConnectorOptions = new
JdbcConnectorOptions(this.pluginConfig);
+ this.jdbcSinkOptions = new JdbcSinkOptions(this.pluginConfig);
}
@Override
@@ -73,19 +73,19 @@ public class JdbcSink
SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> sinkWriter;
// TODO SeatunnelTyoeInfo is not good enough to get typesArray
JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) ->
JdbcUtils.setRecordToStatement(st, null, row);
- if (jdbcConnectorOptions.isExactlyOnce()) {
+ if (jdbcSinkOptions.isExactlyOnce()) {
sinkWriter = new JdbcExactlyOnceSinkWriter(
context,
seaTunnelContext,
statementBuilder,
- jdbcConnectorOptions,
+ jdbcSinkOptions,
new ArrayList<>()
);
} else {
sinkWriter = new JdbcSinkWriter(
context,
statementBuilder,
- jdbcConnectorOptions);
+ jdbcSinkOptions);
}
return sinkWriter;
@@ -94,13 +94,13 @@ public class JdbcSink
@Override
public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState>
restoreWriter(SinkWriter.Context context, List<JdbcSinkState> states)
throws IOException {
- if (jdbcConnectorOptions.isExactlyOnce()) {
+ if (jdbcSinkOptions.isExactlyOnce()) {
JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) ->
JdbcUtils.setRecordToStatement(st, null, row);
return new JdbcExactlyOnceSinkWriter(
context,
seaTunnelContext,
statementBuilder,
- jdbcConnectorOptions,
+ jdbcSinkOptions,
states
);
}
@@ -110,8 +110,8 @@ public class JdbcSink
@Override
public Optional<SinkAggregatedCommitter<XidInfo,
JdbcAggregatedCommitInfo>> createAggregatedCommitter()
throws IOException {
- if (jdbcConnectorOptions.isExactlyOnce()) {
- return Optional.of(new
JdbcSinkAggregatedCommitter(jdbcConnectorOptions));
+ if (jdbcSinkOptions.isExactlyOnce()) {
+ return Optional.of(new
JdbcSinkAggregatedCommitter(jdbcSinkOptions));
}
return Optional.empty();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
index e5c9f308b..f0ee2671e 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
@@ -36,15 +36,15 @@ public class JdbcSinkAggregatedCommitter
private final XaFacade xaFacade;
private final XaGroupOps xaGroupOps;
- private final JdbcConnectorOptions jdbcConnectorOptions;
+ private final JdbcSinkOptions jdbcSinkOptions;
public JdbcSinkAggregatedCommitter(
- JdbcConnectorOptions jdbcConnectorOptions
+ JdbcSinkOptions jdbcSinkOptions
) {
this.xaFacade = XaFacade.fromJdbcConnectionOptions(
- jdbcConnectorOptions);
+ jdbcSinkOptions.getJdbcConnectionOptions());
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
- this.jdbcConnectorOptions = jdbcConnectorOptions;
+ this.jdbcSinkOptions = jdbcSinkOptions;
}
private void tryOpen() throws IOException {
@@ -61,7 +61,7 @@ public class JdbcSinkAggregatedCommitter
public List<JdbcAggregatedCommitInfo>
commit(List<JdbcAggregatedCommitInfo> aggregatedCommitInfos) throws IOException
{
tryOpen();
return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> {
- GroupXaOperationResult<XidInfo> result =
xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false,
jdbcConnectorOptions.getMaxCommitAttempts());
+ GroupXaOperationResult<XidInfo> result =
xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false,
jdbcSinkOptions.getJdbcConnectionOptions().getMaxCommitAttempts());
return new JdbcAggregatedCommitInfo(result.getForRetry());
}).filter(ainfo ->
!ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList());
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
index 626f379a5..f290eec86 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.api.sink.SinkCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
@@ -32,15 +32,15 @@ public class JdbcSinkCommitter
implements SinkCommitter<XidInfo> {
private final XaFacade xaFacade;
private final XaGroupOps xaGroupOps;
- private final JdbcConnectorOptions jdbcConnectorOptions;
+ private final JdbcConnectionOptions jdbcConnectionOptions;
public JdbcSinkCommitter(
- JdbcConnectorOptions jdbcConnectorOptions
+ JdbcConnectionOptions jdbcConnectionOptions
)
throws IOException {
- this.jdbcConnectorOptions = jdbcConnectorOptions;
+ this.jdbcConnectionOptions = jdbcConnectionOptions;
this.xaFacade = XaFacade.fromJdbcConnectionOptions(
- jdbcConnectorOptions);
+ jdbcConnectionOptions);
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
try {
xaFacade.open();
@@ -53,7 +53,7 @@ public class JdbcSinkCommitter
@Override
public List<XidInfo> commit(List<XidInfo> committables) {
return xaGroupOps
- .commit(committables, false,
jdbcConnectorOptions.getMaxCommitAttempts())
+ .commit(committables, false,
jdbcConnectionOptions.getMaxCommitAttempts())
.getForRetry();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 548f10735..f081ad1c0 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -19,13 +19,13 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -45,15 +45,15 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
public JdbcSinkWriter(
SinkWriter.Context context,
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
- JdbcConnectorOptions jdbcConnectorOptions) {
+ JdbcSinkOptions jdbcSinkOptions) {
- JdbcConnectionProvider connectionProvider = new
SimpleJdbcConnectionProvider(jdbcConnectorOptions);
+ JdbcConnectionProvider connectionProvider = new
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
this.context = context;
this.outputFormat = new JdbcOutputFormat<>(
connectionProvider,
- jdbcConnectorOptions,
- () -> new
SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(),
statementBuilder));
+ jdbcSinkOptions.getJdbcConnectionOptions(),
+ () -> new
SimpleBatchStatementExecutor<>(jdbcSinkOptions.getJdbcConnectionOptions().getQuery(),
statementBuilder));
}
private void tryOpen() throws IOException {
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
new file mode 100644
index 000000000..41d07e92e
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+@AutoService(SeaTunnelSource.class)
+public class JdbcSource implements SeaTunnelSource<SeaTunnelRow,
JdbcSourceSplit, JdbcSourceState> {
+ protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+
+ private SeaTunnelContext seaTunnelContext;
+ private JdbcSourceOptions jdbcSourceOptions;
+ private SeaTunnelRowType typeInfo;
+
+ private JdbcDialect jdbcDialect;
+ private JdbcInputFormat inputFormat;
+ private PartitionParameter partitionParameter;
+ private JdbcConnectionProvider jdbcConnectionProvider;
+
+ private String query;
+
+ @Override
+ public String getPluginName() {
+ return "Jdbc";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ jdbcSourceOptions = new JdbcSourceOptions(pluginConfig);
+ jdbcConnectionProvider = new
SimpleJdbcConnectionProvider(jdbcSourceOptions.getJdbcConnectionOptions());
+ query = jdbcSourceOptions.getJdbcConnectionOptions().query;
+ jdbcDialect =
JdbcDialectLoader.load(jdbcSourceOptions.getJdbcConnectionOptions().getUrl());
+ try {
+ typeInfo =
initTableField(jdbcConnectionProvider.getOrEstablishConnection());
+ partitionParameter =
initPartitionParameterAndExtendSql(jdbcConnectionProvider.getOrEstablishConnection());
+ } catch (Exception e) {
+ throw new PrepareFailException("jdbc", PluginType.SOURCE,
e.toString());
+ }
+
+ inputFormat = new JdbcInputFormat(
+ jdbcConnectionProvider,
+ jdbcDialect.getRowConverter(),
+ typeInfo,
+ query,
+ 0,
+ true
+ );
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return seaTunnelContext;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ Connection conn;
+ SeaTunnelRowType seaTunnelDataType = null;
+ try {
+ conn = jdbcConnectionProvider.getOrEstablishConnection();
+ seaTunnelDataType = initTableField(conn);
+ } catch (Exception e) {
+ LOG.warn("get row type info exception", e);
+ }
+ this.typeInfo = seaTunnelDataType;
+ return seaTunnelDataType;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, JdbcSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
+ return new JdbcSourceReader(inputFormat, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
createEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext) throws Exception {
+ return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter);
+ }
+
+ @Override
+ public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceState checkpointState) throws Exception {
+ return new JdbcSourceSplitEnumerator(enumeratorContext,
jdbcSourceOptions, partitionParameter);
+ }
+
+ @Override
+ public Serializer<JdbcSourceState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+
+ private SeaTunnelRowType initTableField(Connection conn) {
+ JdbcDialectTypeMapper jdbcDialectTypeMapper =
jdbcDialect.getJdbcDialectTypeMapper();
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+ PreparedStatement ps =
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+ ResultSetMetaData resultSetMetaData = ps.getMetaData();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ fieldNames.add(resultSetMetaData.getColumnName(i));
+
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
+ }
+ } catch (Exception e) {
+ LOG.warn("get row type info exception", e);
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+ }
+
+ private PartitionParameter initPartitionParameter(String columnName,
Connection connection) throws SQLException {
+ long max = Long.MAX_VALUE;
+ long min = Long.MIN_VALUE;
+ if (jdbcSourceOptions.getPartitionLowerBound().isPresent() &&
jdbcSourceOptions.getPartitionUpperBound().isPresent()) {
+ max = jdbcSourceOptions.getPartitionUpperBound().get();
+ min = jdbcSourceOptions.getPartitionLowerBound().get();
+ return new PartitionParameter(columnName, min, max);
+ }
+ try (ResultSet rs =
connection.createStatement().executeQuery(String.format("SELECT MAX(%s),MIN(%s)
" +
+ "FROM (%s) tt", columnName, columnName, query))) {
+ if (rs.next()) {
+ max = jdbcSourceOptions.getPartitionUpperBound().isPresent() ?
jdbcSourceOptions.getPartitionUpperBound().get() :
+ Long.parseLong(rs.getString(1));
+ min = jdbcSourceOptions.getPartitionLowerBound().isPresent() ?
jdbcSourceOptions.getPartitionLowerBound().get() :
+ Long.parseLong(rs.getString(2));
+ }
+ }
+ return new PartitionParameter(columnName, min, max);
+ }
+
+ private PartitionParameter initPartitionParameterAndExtendSql(Connection
connection) throws SQLException {
+ if (jdbcSourceOptions.getPartitionColumn().isPresent()) {
+ String partitionColumn =
jdbcSourceOptions.getPartitionColumn().get();
+ Map<String, SeaTunnelDataType<?>> fieldTypes = new HashMap<>();
+ for (int i = 0; i < typeInfo.getFieldNames().length; i++) {
+ fieldTypes.put(typeInfo.getFieldName(i),
typeInfo.getFieldType(i));
+ }
+ if (!fieldTypes.containsKey(partitionColumn)) {
+ throw new IllegalArgumentException(String.format("field %s not
contain in query %s",
+ partitionColumn, query));
+ }
+ SeaTunnelDataType<?> partitionColumnType =
fieldTypes.get(partitionColumn);
+ if (!isNumericType(partitionColumnType)) {
+ throw new IllegalArgumentException(String.format("%s is not
numeric type", partitionColumn));
+ }
+ PartitionParameter partitionParameter =
initPartitionParameter(partitionColumn, connection);
+ query = String.format("SELECT * FROM (%s) tt where " +
partitionColumn + " >= ? AND " + partitionColumn + " <= ?", query);
+
+ return partitionParameter;
+ } else {
+ LOG.info("The partition_column parameter is not configured, and
the source parallelism is set to 1");
+ }
+
+ return null;
+ }
+
+ private boolean isNumericType(SeaTunnelDataType<?> type) {
+ return type.equals(BasicType.INT_TYPE) ||
type.equals(BasicType.BIG_INT_TYPE)
+ || type.equals(BasicType.LONG_TYPE);
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
new file mode 100644
index 000000000..3b222958c
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class JdbcSourceReader implements SourceReader<SeaTunnelRow,
JdbcSourceSplit> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
+
+ SourceReader.Context context;
+ Deque<JdbcSourceSplit> splits = new LinkedList<>();
+ JdbcInputFormat inputFormat;
+ boolean noMoreSplit;
+
+ public JdbcSourceReader(JdbcInputFormat inputFormat, SourceReader.Context
context) {
+ this.inputFormat = inputFormat;
+ this.context = context;
+ }
+
+ @Override
+ public void open() throws Exception {
+ inputFormat.openInputFormat();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputFormat.closeInputFormat();
+ }
+
+ @Override
+ @SuppressWarnings("magicnumber")
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ JdbcSourceSplit split = splits.poll();
+ if (null != split) {
+ inputFormat.open(split);
+ while (!inputFormat.reachedEnd()) {
+ SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
+ output.collect(seaTunnelRow);
+ }
+ inputFormat.close();
+ } else if (noMoreSplit) {
+ // signal to the source that we have reached the end of the data.
+ LOG.info("Closed the bounded jdbc source");
+ context.signalNoMoreElement();
+ } else {
+ Thread.sleep(1000L);
+ }
+ }
+
+ @Override
+ public List<JdbcSourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return new ArrayList<>(splits);
+ }
+
+ @Override
+ public void addSplits(List<JdbcSourceSplit> splits) {
+ this.splits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
new file mode 100644
index 000000000..106cb439a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplit.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class JdbcSourceSplit implements SourceSplit {
+
+ Object[] parameterValues;
+ Integer splitId;
+
+ @Override
+ public String splitId() {
+ return splitId.toString();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
new file mode 100644
index 000000000..f3b5368e9
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JdbcSourceSplitEnumerator implements
SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
+
+ SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
+ List<JdbcSourceSplit> allSplit = new ArrayList<>();
+ JdbcSourceOptions jdbcSourceOptions;
+ PartitionParameter partitionParameter;
+
+ public
JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit>
enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter
partitionParameter) {
+ this.enumeratorContext = enumeratorContext;
+ this.jdbcSourceOptions = jdbcSourceOptions;
+ this.partitionParameter = partitionParameter;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public void run() throws Exception {
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ int parallelism = enumeratorContext.currentParallelism();
+ if (allSplit.isEmpty()) {
+ if (null != partitionParameter) {
+ JdbcNumericBetweenParametersProvider
jdbcNumericBetweenParametersProvider = new
JdbcNumericBetweenParametersProvider(partitionParameter.minValue,
partitionParameter.maxValue).ofBatchNum(parallelism);
+ Serializable[][] parameterValues =
jdbcNumericBetweenParametersProvider.getParameterValues();
+ for (int i = 0; i < parameterValues.length; i++) {
+ allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+ }
+ } else {
+ allSplit.add(new JdbcSourceSplit(null, 0));
+ }
+ }
+ // Filter the split that the current task needs to run
+ List<JdbcSourceSplit> splits = allSplit.stream().filter(p -> p.splitId
% parallelism == subtaskId).collect(Collectors.toList());
+ enumeratorContext.assignSplit(subtaskId, splits);
+ enumeratorContext.signalNoMoreSplits(subtaskId);
+ }
+
+ @Override
+ public JdbcSourceState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
new file mode 100644
index 000000000..c16ebb46d
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/PartitionParameter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PartitionParameter implements Serializable {
+
+ String partitionColumnName;
+ Long minValue;
+ Long maxValue;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
new file mode 100644
index 000000000..be9e088ec
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import java.io.Serializable;
+
+public class JdbcSourceState implements Serializable {
+}